Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-5197: preparation for supporting fair sub-query execution in FedX #5198

Merged
merged 3 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.endpoint.ResolvableEndpoint;
import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.DefaultSchedulerFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.FedXException;
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class FedX extends AbstractSail implements RepositoryResolverClient {

private FederationEvaluationStrategyFactory strategyFactory;

private SchedulerFactory schedulerFactory = DefaultSchedulerFactory.INSTANCE;

private WriteStrategyFactory writeStrategyFactory;

private File dataDir;
Expand Down Expand Up @@ -96,6 +100,19 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory
this.strategyFactory = strategyFactory;
}

/* package */ SchedulerFactory getSchedulerFactory() {
return schedulerFactory;
}

/**
* Set the {@link SchedulerFactory}. Can only be done before initialization of the federation
*
* @param schedulerFactory the {@link SchedulerFactory}
*/
public void setSchedulerFactory(SchedulerFactory schedulerFactory) {
this.schedulerFactory = schedulerFactory;
}

/**
*
* @param writeStrategyFactory the {@link WriteStrategyFactory}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper;
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
Expand Down Expand Up @@ -118,26 +119,28 @@ public void reset() {
log.debug("Scheduler for join and union are reset.");
}

SchedulerFactory schedulerFactory = federation.getSchedulerFactory();

Optional<TaskWrapper> taskWrapper = federationContext.getConfig().getTaskWrapper();
if (joinScheduler != null) {
joinScheduler.abort();
}
joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(),
"Join Scheduler");
joinScheduler = schedulerFactory.createJoinScheduler(federationContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it's in the reset method. Could we have an assert to check that the existing joinScheduler is either null or shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line above (see 126, joinScheduler.abort(); makes sure to terminate the thread pool, if it is active. Shouldn't this avoid the assertion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. Must have overlooked that. Will the abort always wait until all threads are stopped?

federationContext.getConfig().getJoinWorkerThreads());
taskWrapper.ifPresent(joinScheduler::setTaskWrapper);

if (unionScheduler != null) {
unionScheduler.abort();
}
unionScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getUnionWorkerThreads(),
"Union Scheduler");
unionScheduler = schedulerFactory.createUnionScheduler(federationContext,
federationContext.getConfig().getUnionWorkerThreads());
taskWrapper.ifPresent(unionScheduler::setTaskWrapper);

if (leftJoinScheduler != null) {
leftJoinScheduler.abort();
}
leftJoinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getLeftJoinWorkerThreads(),
"Left Join Scheduler");
leftJoinScheduler = schedulerFactory.createLeftJoinScheduler(federationContext,
federationContext.getConfig().getLeftJoinWorkerThreads());
taskWrapper.ifPresent(leftJoinScheduler::setTaskWrapper);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
Expand All @@ -42,7 +44,9 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw

private final ExecutorService executor;

private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue<>();
// TODO: in the next major version of RDF4J this final field should be removed.
// Initialization of the executor service should managed the details
private final BlockingQueue<Runnable> _taskQueue;

private final int nWorkers;
private final String name;
Expand All @@ -57,7 +61,8 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
public ControlledWorkerScheduler(int nWorkers, String name) {
this.nWorkers = nWorkers;
this.name = name;
this.executor = createExecutorService();
this._taskQueue = createBlockingQueue();
this.executor = createExecutorService(nWorkers, name);
}

/**
Expand Down Expand Up @@ -112,13 +117,36 @@ public int getTotalNumberOfWorkers() {
return nWorkers;
}

@Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal
public int getNumberOfTasks() {
return _taskQueue.size();
}

private ExecutorService createExecutorService() {
/**
* Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a
* {@link LinkedBlockingQueue}.
*
* @return
*/
@Experimental
protected BlockingQueue<Runnable> createBlockingQueue() {
return new LinkedBlockingQueue<>();
}

/**
* Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The
* default implementation creates a thread pool with a {@link LinkedBlockingQueue}.
*
* The thread pool should be configured to terminate idle threads after a period of time (default: 60s)
*
* @param nWorkers the number of workers in the thread pool
* @param name the base name for threads in the pool
* @return
*/
@Experimental
protected ExecutorService createExecutorService(int nWorkers, String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you intend to override this with your own implementation in your code base, or is it something that you're going to override somewhere in RDF4J?

I could see that it would be useful to specify a different queue, like the PriorityBlockingQueue, to optimise the order of the operations. Could it make sense to have the BlockingQueue be a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end we want to have the priority / fair based implementation directly in RDF4J.

This partial "extension point" is mostly to allow us to work asychronously of the RDF4J release, i.e. we do implementation and testing as extension in our code-base according to our sprint planning, and once ready contribute the implementation back to RDF4J (tentatively for RDF4J 5.2)

This approach makes it much easier for testing. We also developed the left bind join implementation using this approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can mark it as experimental so that you're able to modify it later on if needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion 👍 , this gives us a bit more freedom. Have added the annotation.


ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue,
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,
new NamingThreadFactory(name));
executor.allowCoreThreadTimeOut(true);
return executor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.query.BindingSet;

/**
* The default {@link SchedulerFactory}
*/
public class DefaultSchedulerFactory implements SchedulerFactory {

public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory();

@Override
public ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this where you're going to return other schedulers in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly, either in this default factory or in a different factory

"Join Scheduler");
}

@Override
public ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
"Union Scheduler");
}

@Override
public ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
"Left Join Scheduler");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBindLeftJoinTask;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask;
import org.eclipse.rdf4j.query.BindingSet;

/**
* Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background
*
* @see DefaultSchedulerFactory
* @author Andreas Schwarte
*/
public interface SchedulerFactory {

/**
* Create a {@link ControlledWorkerScheduler} for regular joins (e.g., the sub-queries generated as part of bind
* joins)
*
* @param federationContext
* @param nWorkers
* @return
* @see ControlledWorkerBindJoin
* @see ParallelBoundJoinTask
*/
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this is for inner join. Could you add something to the javadocs explaining what it's used for? Eg. Mostly for joining statement patterns in the query.

Similarly for the left join that it's typically for handling OPTIONAL clauses in the query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestions, added some details


/**
* Create a {@link ControlledWorkerScheduler} for unions (e.g., for executing UNION operands in parallel)
*
* @param federationContext
* @param nWorkers
* @return
*/
ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, int nWorkers);

/**
* Create a {@link ControlledWorkerScheduler} for left joins (e.g., the sub-queries generated as part of left bind
* joins, i.e. OPTIONAL)
*
* @param federationContext
* @param nWorkers
* @return
* @see ControlledWorkerBindLeftJoin
* @see ParallelBindLeftJoinTask
*/
ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, int nWorkers);
}
Loading