Skip to content

Commit

Permalink
DSC-1459 implement queuing mechanism to update related item
Browse files Browse the repository at this point in the history
  • Loading branch information
abollini committed Mar 9, 2024
1 parent 0b64e6d commit 03b269b
Show file tree
Hide file tree
Showing 17 changed files with 759 additions and 127 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://www.dspace.org/license/
*/
package org.dspace.content.dao;

import java.sql.SQLException;
import java.util.UUID;

import org.dspace.core.Context;

public interface ItemForMetadataEnhancementUpdateDAO {

/**
* Add to the metadata_enhancement_update table queue an entry of each items
* that is potentially affected by the update of the Item with the specified
* uuid. The items potentially affected are the one that have the provided uuid
* as value of a cris.virtualsource.* metadata
*
* @param context the DSpace Context object
* @param uuid the uuid of the updated item
* @return the number of affected items scheduled for update
* @throws SQLException if a problem with the database occurs
*/
int saveAffectedItemsForUpdate(Context context, UUID uuid);

/**
* Remove from the metadata_enhancement_update table queue the entry if any
* related to the specified id in older than the current date
*
* @param context the DSpace Context object
* @param itemToRemove the uuid of the processed item
* @throws SQLException if a problem with the database occurs
*/
void removeItemForUpdate(Context context, UUID itemToRemove);

/**
* Extract and remove from the table the first uuid to process from the
* itemupdate_metadata_enhancement table ordered by date queued asc (older
* first)
*
* @param context
* @return
*/
UUID pollItemToUpdate(Context context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://www.dspace.org/license/
*/
package org.dspace.content.dao.impl;

import java.sql.SQLException;
import java.util.UUID;

import org.dspace.content.MetadataSchema;
import org.dspace.content.dao.ItemForMetadataEnhancementUpdateDAO;
import org.dspace.content.factory.ContentServiceFactory;
import org.dspace.content.service.MetadataSchemaService;
import org.dspace.core.Context;
import org.dspace.core.DBConnection;
import org.dspace.services.ConfigurationService;
import org.dspace.utils.DSpace;
import org.hibernate.Session;
import org.hibernate.query.NativeQuery;
import org.springframework.beans.factory.annotation.Autowired;

/**
* Hibernate implementation of the Database Access Object interface class for
* the ItemForMetadataEnhancementUpdate object. This class is responsible for
* all database calls for the ItemForMetadataEnhancementUpdate object and is
* autowired by spring This class should never be accessed directly.
*/
public class ItemForMetadataEnhancementUpdateDAOImpl implements ItemForMetadataEnhancementUpdateDAO {
@Autowired
ConfigurationService configurationService;

@Override
public void removeItemForUpdate(Context context, UUID itemToRemove) {
try {
Session session = getHibernateSession();
String sql = "DELETE FROM itemupdate_metadata_enhancement WHERE uuid = :uuid";
NativeQuery<?> query = session.createNativeQuery(sql);
query.setParameter("uuid", itemToRemove);
query.executeUpdate();
} catch (Exception e) {
throw new RuntimeException(e);
}

}

@Override
public UUID pollItemToUpdate(Context context) {
try {
Session session = getHibernateSession();
String sql = "SELECT cast(uuid as varchar) FROM itemupdate_metadata_enhancement"
+ " ORDER BY date_queued ASC LIMIT 1";
NativeQuery<?> query = session.createNativeQuery(sql);
Object uuidObj = query.uniqueResult();
if (uuidObj != null) {
UUID uuid;
if (uuidObj instanceof String) {
uuid = (UUID) UUID.fromString((String) uuidObj);
} else {
throw new RuntimeException("Unexpected result type from the database " + uuidObj);
}
removeItemForUpdate(context, uuid);
return uuid;
} else {
return null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}

}

@Override
public int saveAffectedItemsForUpdate(Context context, UUID uuid) {
try {
Session session = getHibernateSession();
MetadataSchemaService schemaService = ContentServiceFactory.getInstance().getMetadataSchemaService();
MetadataSchema schema = schemaService.find(context, "cris");
String sqlInsertOrUpdate;
if ("org.h2.Driver".equals(configurationService.getProperty("db.driver"))) {
// H2 doesn't support the INSERT OR UPDATE statement so let's do in two steps
// update queued date for records already in the queue
String sqlUpdate = "UPDATE itemupdate_metadata_enhancement"
+ " SET date_queued = CURRENT_TIMESTAMP"
+ " WHERE uuid IN ("
+ " SELECT dspace_object_id FROM metadatavalue " + " WHERE metadata_field_id IN"
+ " ( SELECT metadata_field_id FROM metadatafieldregistry "
+ " WHERE metadata_schema_id = :schema AND element = 'virtualsource')"
+ " AND text_value = :uuid)";
String sqlInsert =
"INSERT INTO itemupdate_metadata_enhancement (uuid, date_queued)"
+ " SELECT DISTINCT dspace_object_id, CURRENT_TIMESTAMP FROM metadatavalue "
+ " WHERE metadata_field_id IN"
+ " ( SELECT metadata_field_id FROM metadatafieldregistry "
+ " WHERE metadata_schema_id = :schema AND element = 'virtualsource'"
+ " AND text_value = :uuid) "
+ " AND NOT EXISTS ("
+ " SELECT 1"
+ " FROM itemupdate_metadata_enhancement"
+ " WHERE uuid = :uuid"
+ " )";
NativeQuery<?> queryUpdate = session.createNativeQuery(sqlUpdate);
queryUpdate.setParameter("uuid", uuid.toString());
queryUpdate.setParameter("schema", schema.getID());
queryUpdate.executeUpdate();
NativeQuery<?> queryInsert = session.createNativeQuery(sqlInsert);
queryInsert.setParameter("uuid", uuid.toString());
queryInsert.setParameter("schema", schema.getID());
return queryInsert.executeUpdate();
} else {
sqlInsertOrUpdate = "INSERT INTO itemupdate_metadata_enhancement (uuid, date_queued) "
+ " SELECT DISTINCT dspace_object_id, CURRENT_TIMESTAMP FROM metadatavalue "
+ " WHERE metadata_field_id IN"
+ " ( SELECT metadata_field_id FROM metadatafieldregistry "
+ " WHERE metadata_schema_id = :schema AND element = 'virtualsource'"
+ " AND text_value = :uuid) "
+ " ON CONFLICT (uuid) DO UPDATE"
+ " SET date_queued = EXCLUDED.date_queued";
NativeQuery<?> queryInsertOrUpdate = session.createNativeQuery(sqlInsertOrUpdate);
queryInsertOrUpdate.setParameter("uuid", uuid.toString());
queryInsertOrUpdate.setParameter("schema", schema.getID());
return queryInsertOrUpdate.executeUpdate();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* The Hibernate Session used in the current thread
*
* @return the current Session.
* @throws SQLException
*/
private Session getHibernateSession() throws SQLException {
DBConnection dbConnection = new DSpace().getServiceManager().getServiceByName(null, DBConnection.class);
return ((Session) dbConnection.getSession());
}

public UUID ConvertByteArrayToUUID(byte[] bytea) {
long mostSigBits = 0;
long leastSigBits = 0;
for (int i = 0; i < 8; i++) {
mostSigBits = (mostSigBits << 8) | (bytea[i] & 0xff);
leastSigBits = (leastSigBits << 8) | (bytea[i + 8] & 0xff);
}

UUID uuid = new UUID(mostSigBits, leastSigBits);
return uuid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

import org.dspace.content.Item;
import org.dspace.content.enhancer.service.ItemEnhancerService;
import org.dspace.content.factory.ContentServiceFactory;
import org.dspace.content.service.ItemService;
import org.dspace.core.Context;
import org.dspace.event.Consumer;
import org.dspace.event.Event;
Expand All @@ -29,10 +32,12 @@
public class ItemEnhancerConsumer implements Consumer {

public static final String ITEMENHANCER_ENABLED = "itemenhancer.enabled";
private Set<Item> itemsAlreadyProcessed = new HashSet<Item>();
private Set<UUID> itemsToProcess = new HashSet<UUID>();

private ItemEnhancerService itemEnhancerService;

private ItemService itemService = ContentServiceFactory.getInstance().getItemService();

private ConfigurationService configurationService = DSpaceServicesFactory.getInstance().getConfigurationService();

@Override
Expand All @@ -53,19 +58,11 @@ public void consume(Context context, Event event) throws Exception {
}

Item item = (Item) event.getSubject(context);
if (item == null || itemsAlreadyProcessed.contains(item) || !item.isArchived()) {
if (item == null || !item.isArchived()) {
return;
}

itemsAlreadyProcessed.add(item);

context.turnOffAuthorisationSystem();
try {
itemEnhancerService.enhance(context, item, false);
} finally {
context.restoreAuthSystemState();
}

itemsToProcess.add(item.getID());
}

protected boolean isConsumerEnabled() {
Expand All @@ -74,7 +71,19 @@ protected boolean isConsumerEnabled() {

@Override
public void end(Context ctx) throws Exception {
itemsAlreadyProcessed.clear();
ctx.turnOffAuthorisationSystem();
try {
for (UUID uuid : itemsToProcess) {
Item item = itemService.find(ctx, uuid);
if (item != null) {
itemEnhancerService.enhance(ctx, item, false);
itemEnhancerService.saveAffectedItemsForUpdate(ctx, item.getID());
}
}
} finally {
ctx.restoreAuthSystemState();
}
itemsToProcess.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ private void clearAllVirtualMetadata(Context context, Item item) throws SQLExcep
private void addMetadata(Context context, Item item, Map<String, List<MetadataValueDTO>> toBeMetadataValues)
throws SQLException {
for (Entry<String, List<MetadataValueDTO>> metadataValues : toBeMetadataValues.entrySet()) {
addVirtualSourceField(context, item, metadataValues.getKey());
for (MetadataValueDTO dto : metadataValues.getValue()) {
addVirtualSourceField(context, item, metadataValues.getKey());
addVirtualField(context, item, dto.getValue(), dto.getAuthority(), dto.getLanguage(),
dto.getConfidence());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
*/
package org.dspace.content.enhancer.service;

import java.sql.SQLException;
import java.util.UUID;

import org.dspace.content.Item;
import org.dspace.content.dao.ItemForMetadataEnhancementUpdateDAO;
import org.dspace.core.Context;

/**
Expand All @@ -29,4 +33,21 @@ public interface ItemEnhancerService {
*/
void enhance(Context context, Item item, boolean deepMode);

/**
* Find items that could be affected by a change of the item with given uuid
* and save them to db for future processing
*
* @param context the DSpace Context
* @param uuid UUID of the changed item
*/
void saveAffectedItemsForUpdate(Context context, UUID uuid) throws SQLException;

/**
* Extract the first uuid in the itemupdate_metadata_enhancement table, see
* {@link ItemForMetadataEnhancementUpdateDAO#pollItemToUpdate(Context)}
*
* @param context the DSpace Context
* @return UUID of the older item queued for update
*/
UUID pollItemToUpdate(Context context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import java.sql.SQLException;
import java.util.List;
import java.util.UUID;

import org.dspace.authorize.AuthorizeException;
import org.dspace.content.Item;
import org.dspace.content.dao.ItemForMetadataEnhancementUpdateDAO;
import org.dspace.content.enhancer.ItemEnhancer;
import org.dspace.content.enhancer.service.ItemEnhancerService;
import org.dspace.content.service.ItemService;
Expand All @@ -32,10 +34,15 @@ public class ItemEnhancerServiceImpl implements ItemEnhancerService {
@Autowired
private ItemService itemService;

@Autowired
private ItemForMetadataEnhancementUpdateDAO itemForMetadataEnhancementUpdateDAO;

@Override
public void enhance(Context context, Item item, boolean deepMode) {
boolean isUpdateNeeded = false;

if (deepMode) {
itemForMetadataEnhancementUpdateDAO.removeItemForUpdate(context, item.getID());
}
for (ItemEnhancer itemEnhancer : itemEnhancers) {
if (itemEnhancer.canEnhance(context, item)) {
isUpdateNeeded = itemEnhancer.enhance(context, item, deepMode) || isUpdateNeeded;
Expand All @@ -47,6 +54,16 @@ public void enhance(Context context, Item item, boolean deepMode) {
}
}

@Override
public void saveAffectedItemsForUpdate(Context context, UUID uuid) throws SQLException {
itemForMetadataEnhancementUpdateDAO.saveAffectedItemsForUpdate(context, uuid);
}

@Override
public UUID pollItemToUpdate(Context context) {
return itemForMetadataEnhancementUpdateDAO.pollItemToUpdate(context);
}

private void updateItem(Context context, Item item) {
try {
itemService.update(context, item);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--
-- The contents of this file are subject to the license and copyright
-- detailed in the LICENSE and NOTICE files at the root of the source
-- tree and available online at
--
-- http://www.dspace.org/license/
--

-----------------------------------------------------------------------------------
-- Create TABLE itemupdate_metadata_enhancement
-----------------------------------------------------------------------------------

CREATE TABLE itemupdate_metadata_enhancement
(
uuid UUID NOT NULL PRIMARY KEY,
date_queued TIMESTAMP NOT NULL
);
CREATE INDEX idx_date_queued ON itemupdate_metadata_enhancement(date_queued);
Loading

0 comments on commit 03b269b

Please sign in to comment.