Skip to content

Commit

Permalink
move duplicate check to own route
Browse files Browse the repository at this point in the history
  • Loading branch information
codingPF committed Oct 15, 2023
1 parent 63fb7e1 commit 3848b4d
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class FilmPersistenceService {
private final FilmRepository filmRepository;

public <S extends Film> Iterable<S> saveAllMergeIfExists(Iterable<S> entities) {
//LOG.info("input size " + StreamSupport.stream(entities.spliterator(), false).count());
LOG.info("FilmPersistenceService input size " + StreamSupport.stream(entities.spliterator(), false).count());
// for some reason the reference needs to be handled by us
entities.forEach(film -> { film.insertTimestamp = LocalDateTime.now();film.urls.forEach(url -> { url.film = ((Film)film); });});

Expand All @@ -34,6 +34,7 @@ public <S extends Film> Iterable<S> saveAllMergeIfExists(Iterable<S> entities) {
//updateUuidIfFilmsAlreadyExists(modifiedEntities);
//LOG.info("updated ids for existing " + modifiedEntities.size());
Iterable<S> s = filmRepository.saveAll(modifiedEntities);

//LOG.info("saveAll ids for existing " + modifiedEntities.size());
return s;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
@Transactional
public interface FilmRepository extends CrudRepository<Film, UUID> {

// OOM
//EntityGraph(attributePaths = {"urls", "audioDescriptions", "signLanguages", "subtitles", "geoLocations" }, type=EntityGraphType.FETCH)
@Transactional(readOnly = true)
@EntityGraph(attributePaths = {"urls", "audioDescriptions", "signLanguages", "subtitles", "geoLocations" }, type=EntityGraphType.FETCH)
List<Film> findByUuidNotNull();

@EntityGraph(attributePaths = {"urls", "audioDescriptions", "signLanguages", "subtitles", "geoLocations" }, type=EntityGraphType.FETCH)
List<Film> findBySender(Sender sender);

Optional<Film> findBySenderAndTitelIgnoreCaseAndThemaIgnoreCaseAndDuration(
Sender sender, String titel, String thema, Duration duration);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package de.mediathekview.fimlistmerger.persistence;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.persistence.EntityManager;
import javax.persistence.Table;

@Service
@Transactional
public class FilmlistMerge {
Logger LOG = LoggerFactory.getLogger(FilmlistMerge.class);

@Autowired
EntityManager entityManager;


public void removeOldEntries() {
String sql = createMergeQuery().toString();
LOG.info("geneated sql " + sql);
//try { Thread.sleep(10*1000); } catch(Exception e) {}
javax.persistence.Query query = entityManager.createNativeQuery(sql);
int rs = query.executeUpdate();
LOG.info("executed sql rs: " + rs);
}

private StringBuilder createMergeQuery() {
StringBuilder sql = new StringBuilder();
String tablename = "T" + System.currentTimeMillis();
sql
.append("create table ").append(tablename).append("(id uuid, insert_date timestamp);");
sql
.append("insert into ")
.append(tablename)
.append(" select uuid, insert_timestamp from (")
.append(" select uuid, insert_timestamp, row_number() over (partition by sender, thema, titel, duration order by insert_timestamp desc) rn")
.append(" from ").append(Film.class.getAnnotation(Table.class).name())
.append(") oldEntries")
.append(" where rn > 1")
.append(";");
sql
.append("delete from ").append("film_geo_locations")
.append(" where film_uuid in (select id from ").append(tablename).append(")")
.append(";");
sql
.append("delete from ").append("film_subtitles")
.append(" where film_uuid in (select id from ").append(tablename).append(")")
.append(";");
sql
.append("delete from ").append("film_url")
.append(" where film_uuid in (select id from ").append(tablename).append(")")
.append(";");
sql
.append("delete from ").append(Film.class.getAnnotation(Table.class).name())
.append(" where uuid in (select id from ").append(tablename).append(")")
.append(";");
sql
.append("drop table ").append(tablename).append(";");
return sql;
}

/*
* create table temp_mergeIds (id uuid, insert_date timestamp);
insert into temp_mergeIds
select uuid, insert_timestamp from (
select uuid, insert_timestamp, row_number() over (partition by sender, thema, titel, duration order by insert_timestamp desc) rn from film
) x where rn > 1;
delete from film_geo_locations where film_uuid in (select id from temp_mergeIds);
delete from film_subtitles where film_uuid in (select id from temp_mergeIds);
delete from film_url where film_id in (select id from temp_mergeIds);
delete from film where uuid in (select id from temp_mergeIds);
*/



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package de.mediathekview.fimlistmerger.routes;

import de.mediathekview.fimlistmerger.persistence.FilmlistMerge;
import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class AfterDatabaseImportRoute extends RouteBuilder {
public static final String ROUTE_ID = "AfterDatabaseImportRoute";
public static final String DIRECT_AFTER_DATABASE_IMPORT = "direct:afterDatabaseImportRoute";

private final FilmlistMerge filmlistMerge;

@Override
public void configure() {
from(DIRECT_AFTER_DATABASE_IMPORT)
.routeId(ROUTE_ID)
.bean(filmlistMerge, "removeOldEntries")
.log("Removed old entries")
;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public void configure() {
.routeId(ROUTE_ID)
.to(Metrics.TIMER_WRITE_FILM_START.toString())
.onCompletion()
.log("Data stored in DB")
.to(Metrics.TIMER_WRITE_FILM_STOP.toString())
.end()
.process(filmToPersistenceFilmProcessor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public void configure() {
.id(SWITCH_ON_FILMLIST_FORMAT_ROUTING_TARGET)
.onCompletion()
.onCompleteOnly()
.log(LoggingLevel.INFO, "Update filmlist database")
.to(AfterDatabaseImportRoute.DIRECT_AFTER_DATABASE_IMPORT)
.id("AfterDatabaseImportRoute")
.log(LoggingLevel.INFO, "Completed reading files, writing consolidated filmlist now.")
.to(WriteConsolidatedFilmlistRoute.ROUTE_FROM)
.id(WRITE_CONSOLIDATED_FILMLIST_ROUTING_TARGET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void configure() {
.setBody()
//.method(filmRepository, "findAll")
.method(filmRepository, "findByUuidNotNull")
//.method(filmRepository, "findBySender(ARD)")
.process(persistenceFilmsToFilmlistProcessor)
.log(LoggingLevel.INFO, "... finished reading from DB")
.id(WRITE_CONSOLIDATED_FILMLIST_PERSISTENCE_FILMS_TO_FILMLIST_PROCESSOR)
Expand Down
24 changes: 13 additions & 11 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# General
camel.dataformat.json-jackson.auto-discover-object-mapper=true
camel.springboot.main-run-controller=true
camel.threadpool.pool-size = 100
camel.threadpool.max-pool-size = 100
FilmlistSplitterBean.messageChunkSize = 100
camel.threadpool.pool-size = 10
camel.threadpool.max-pool-size = 10
FilmlistSplitterBean.messageChunkSize = 10000

logging.level.liquibase=WARN
logging.level.root=INFO
Expand All @@ -17,11 +17,19 @@ logging.level.org.apache.camel=INFO

camel.component.metrics.enabled=true

spring.datasource.hikari.maximum-pool-size=500
spring.datasource.hikari.maximum-pool-size=100

# Database
spring.datasource.url=jdbc:postgresql://localhost:5432/filmlistmerger?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true&reWriteBatchedInserts=true
#spring.datasource.jdbc-url=jdbc:postgresql://localhost:5432/filmlistmerger?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true
spring.datasource.username=filmlistmerger
spring.datasource.password=filmlistmerger
spring.datasource.hikari.auto-commit=false
spring.jpa.properties.hibernate.jdbc.fetch_size=1000

spring.jpa.database-platform=org.hibernate.dialect.PostgreSQL9Dialect
spring.jpa.hibernate.ddl-auto=none
spring.jpa.properties.hibernate.jdbc.batch_size=1000
spring.jpa.properties.hibernate.jdbc.batch_size=10000
spring.jpa.properties.hibernate.order_updates = true
spring.jpa.properties.hibernate.jdbc.batch_versioned_data = true
spring.jpa.properties.hibernate.order_inserts=true
Expand All @@ -41,12 +49,6 @@ management.metrics.export.influx.org=mediathekview
management.metrics.export.influx.token=
management.metrics.export.influx.uri=http://localhost:8086

# Database
spring.datasource.url=jdbc:postgresql://localhost:5432/filmlistmerger?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true
#spring.datasource.jdbc-url=jdbc:postgresql://localhost:5432/filmlistmerger?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true
spring.datasource.username=filmlistmerger
spring.datasource.password=filmlistmerger

# Application
filmlistmerger.input.path=input
filmlistmerger.output.format=OLD
Expand Down

0 comments on commit 3848b4d

Please sign in to comment.