Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OF-2437: Improve CachingPubsubPersistenceProvider #2664

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2020-2022 Ignite Realtime Foundation. All rights reserved.
* Copyright (C) 2020-2025 Ignite Realtime Foundation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/
package org.jivesoftware.openfire.pubsub;

import com.google.common.annotations.VisibleForTesting;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.pep.PEPService;
import org.jivesoftware.openfire.pubsub.cluster.FlushTask;
Expand All @@ -36,29 +37,48 @@
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;

/**
* A persistence provider for Pub/Sub functionality that adds caching behavior. Instead of 'writing through' to the
* persistence layer, the caching implementation will create batches of operations (optimizing away redundant actions).
* Additionally, recently accessed published data items are cached. This improves performance when processing many
* pub/sub operations (node modifiations, item publications, etc).
*
* This provider itself does not persist data. Instead, it uses a different persistence provider as a delegate to
* perform these actions.
*
* @author Guus der Kinderen, [email protected]
*/
public class CachingPubsubPersistenceProvider implements PubSubPersistenceProvider
{
private static final Logger log = LoggerFactory.getLogger(CachingPubsubPersistenceProvider.class);

/**
* The class definition used to instantiate the delegate, used by this instance to interact with persistent data
* storage.
*/
public static final SystemProperty<Class> DELEGATE = SystemProperty.Builder.ofType(Class.class)
.setKey("provider.pubsub-persistence.caching.delegate-className")
.setBaseClass(PubSubPersistenceProvider.class)
.setDefaultValue(DefaultPubSubPersistenceProvider.class)
.setDynamic(false)
.build();

private PubSubPersistenceProvider delegate;
/**
* The delegate instance, used by this instance to interact with persistent data storage.
*/
@VisibleForTesting
PubSubPersistenceProvider delegate;

/**
* Pseudo-random number generator is used to offset timing for scheduled tasks
* within a cluster (so they don't run at the same time on all members).
*/
private Random prng = new Random();
private final Random prng = new Random();

/**
* Flush timer delay is configurable, but not less than 20 seconds (default: 2 mins)
*/
private static Duration flushTimerDelay = Duration.ofSeconds(Math.max( 20, JiveGlobals.getIntProperty( "xmpp.pubsub.flush.timer", 120)));
private static final Duration flushTimerDelay = Duration.ofSeconds(Math.max( 20, JiveGlobals.getIntProperty( "xmpp.pubsub.flush.timer", 120)));

/**
* Maximum number of published items allowed in the write cache
Expand All @@ -69,20 +89,24 @@ public class CachingPubsubPersistenceProvider implements PubSubPersistenceProvid
/**
* Queue that holds the (wrapped) items that need to be added to the database.
*/
private Deque<PublishedItem> itemsToAdd = new ConcurrentLinkedDeque<>();
@VisibleForTesting
Deque<PublishedItem> itemsToAdd = new ConcurrentLinkedDeque<>();

/**
* Queue that holds the items that need to be deleted from the database.
*/
private Deque<PublishedItem> itemsToDelete = new ConcurrentLinkedDeque<>();
@VisibleForTesting
Deque<PublishedItem> itemsToDelete = new ConcurrentLinkedDeque<>();

/**
* Keeps reference to published items that haven't been persisted yet so they
* can be removed before being deleted.
*/
private final HashMap<PublishedItem.UniqueIdentifier, PublishedItem> itemsPending = new HashMap<>();
@VisibleForTesting
final HashMap<PublishedItem.UniqueIdentifier, PublishedItem> itemsPending = new HashMap<>();

private ConcurrentMap<Node.UniqueIdentifier, List<NodeOperation>> nodesToProcess = new ConcurrentHashMap<>();
@VisibleForTesting
final ConcurrentMap<Node.UniqueIdentifier, List<NodeOperation>> nodesToProcess = new ConcurrentHashMap<>();

/**
* Cache name for recently accessed published items.
Expand Down Expand Up @@ -181,6 +205,11 @@ private void flushPendingNode( Node.UniqueIdentifier uniqueIdentifier )
{
log.trace( "Flushing pending node: {} for service: {}", uniqueIdentifier.getNodeId(), uniqueIdentifier.getServiceIdentifier().getServiceId() );

nodesToProcess.computeIfPresent( uniqueIdentifier , ( key, operations ) -> {
operations.forEach( operation -> log.trace("- {}", operation) );
// Returning null causes the mapping to removed from nodesToProcess.
return null;
} );
// TODO verify if this is having the desired effect. - nodes could be in a hierarchy, which could warrant for flushing the entire tree.
// TODO verify that this is thread-safe.
nodesToProcess.computeIfPresent( uniqueIdentifier , ( key, operations ) -> {
Expand Down Expand Up @@ -234,8 +263,12 @@ public void removeNode(Node node) {
}

final List<NodeOperation> operations = nodesToProcess.computeIfAbsent( node.getUniqueIdentifier(), id -> new ArrayList<>() );
operations.clear(); // Any previously recorded, but as of yet unsaved operations, can be skipped.
operations.add( NodeOperation.remove( node ));
final boolean hadCreate = operations.stream().anyMatch(operation -> operation.action.equals(NodeOperation.Action.CREATE));
operations.removeIf(operation -> !operation.action.equals(NodeOperation.Action.REMOVE)); // Any previously recorded, but as of yet unsaved operations, can be skipped.

if (!hadCreate) { // If one of the operations that have not been executed was a node create, we need not delete the node either.
operations.add(NodeOperation.remove(node));
}
}

@Override
Expand Down Expand Up @@ -314,18 +347,25 @@ public void removeAffiliation(Node node, NodeAffiliate affiliate) {
final List<NodeOperation> operations = nodesToProcess.computeIfAbsent( node.getUniqueIdentifier(), id -> new ArrayList<>() );

// This affiliation removal can replace any pending creation, update or delete of the same affiliate (since the last create/delete of the node or affiliation change of this affiliate to the node).
boolean hadCreate = false;
final ListIterator<NodeOperation> iter = operations.listIterator( operations.size() );
while ( iter.hasPrevious() ) {
final NodeOperation operation = iter.previous();
if ( Arrays.asList( NodeOperation.Action.CREATE_AFFILIATION, NodeOperation.Action.UPDATE_AFFILIATION, NodeOperation.Action.REMOVE_AFFILIATION ).contains( operation.action ) ) {
if ( Arrays.asList( NodeOperation.Action.CREATE_AFFILIATION, NodeOperation.Action.UPDATE_AFFILIATION ).contains( operation.action ) ) {
if ( affiliate.getJID().equals( operation.affiliate.getJID() ) ) {
if (operation.action.equals(NodeOperation.Action.CREATE_AFFILIATION)) {
hadCreate = true;
}
iter.remove(); // This is replaced by the update that's being added.
}
} else {
break; // Operations that precede anything other than the last operations that are affiliate changes shouldn't be replaced.
}
}
operations.add( NodeOperation.removeAffiliation( node, affiliate ) );

if (!hadCreate) { // If one of the operations that have not been executed was an affiliation create, we need not delete the affiliation either.
operations.add(NodeOperation.removeAffiliation(node, affiliate));
}
}

@Override
Expand Down Expand Up @@ -365,19 +405,26 @@ public void removeSubscription(NodeSubscription subscription) {

final List<NodeOperation> operations = nodesToProcess.computeIfAbsent( subscription.getNode().getUniqueIdentifier(), id -> new ArrayList<>() );

// This subscription removal can replace any pending creation, update or delete of the same subscription (since the last create/delete of the node or subscription change of this subscription to the node).
// This subscription removal can replace any pending creation or update of the same subscription (since the last create/delete of the node or subscription change of this subscription to the node).
boolean hadCreate = false;
final ListIterator<NodeOperation> iter = operations.listIterator( operations.size() );
while ( iter.hasPrevious() ) {
final NodeOperation operation = iter.previous();
if ( Arrays.asList( NodeOperation.Action.CREATE_SUBSCRIPTION, NodeOperation.Action.UPDATE_SUBSCRIPTION, NodeOperation.Action.REMOVE_SUBSCRIPTION ).contains( operation.action ) ) {
if ( Arrays.asList( NodeOperation.Action.CREATE_SUBSCRIPTION, NodeOperation.Action.UPDATE_SUBSCRIPTION ).contains( operation.action ) ) {
if ( subscription.getID().equals( operation.subscription.getID() ) ) {
if (operation.action == NodeOperation.Action.CREATE_SUBSCRIPTION) {
hadCreate = true;
}
iter.remove(); // This is replaced by the update that's being added.
}
} else {
break; // Operations that precede anything other than the last operations that are subscription changes shouldn't be replaced.
}
}
operations.add( NodeOperation.removeSubscription( subscription.getNode(), subscription ) );

if (!hadCreate) { // If one of the operations that have not been executed was a subscription create, we need not delete the subscription either.
operations.add(NodeOperation.removeSubscription(subscription.getNode(), subscription));
}
}

private void process( final NodeOperation operation ) {
Expand Down Expand Up @@ -461,6 +508,10 @@ public void savePublishedItem(PublishedItem item) {
if (itemToReplace != null) {
itemsToAdd.remove(itemToReplace); // remove duplicate from itemsToAdd linked list
}

// TODO this iterates over all elements in the collection. See if this can be improved for performance.
itemsToDelete.removeIf(scheduledItem -> item.getUniqueIdentifier().equals(scheduledItem.getUniqueIdentifier()));

itemsToAdd.addLast(item);
itemsPending.put(itemKey, item);
}
Expand Down Expand Up @@ -600,7 +651,10 @@ public void removePublishedItem(PublishedItem item) {
synchronized (itemsPending)
{
itemsToDelete.addLast(item);
itemsPending.remove(itemKey);
PublishedItem itemToReplace = itemsPending.remove(itemKey);
if (itemToReplace != null) {
itemsToAdd.remove(itemToReplace); // remove duplicate from itemsToAdd linked list
}
}
}

Expand Down
Loading
Loading