-
Notifications
You must be signed in to change notification settings - Fork 165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-5197: preparation for supporting fair sub-query execution in FedX #5198
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,12 +11,14 @@ | |
package org.eclipse.rdf4j.federated.evaluation.concurrent; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.eclipse.rdf4j.common.annotation.Experimental; | ||
import org.eclipse.rdf4j.common.iteration.CloseableIteration; | ||
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; | ||
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; | ||
|
@@ -42,7 +44,9 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw | |
|
||
private final ExecutorService executor; | ||
|
||
private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue<>(); | ||
// TODO: in the next major version of RDF4J this final field should be removed. | ||
// Initialization of the executor service should managed the details | ||
private final BlockingQueue<Runnable> _taskQueue; | ||
|
||
private final int nWorkers; | ||
private final String name; | ||
|
@@ -57,7 +61,8 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw | |
public ControlledWorkerScheduler(int nWorkers, String name) { | ||
this.nWorkers = nWorkers; | ||
this.name = name; | ||
this.executor = createExecutorService(); | ||
this._taskQueue = createBlockingQueue(); | ||
this.executor = createExecutorService(nWorkers, name); | ||
} | ||
|
||
/** | ||
|
@@ -112,13 +117,36 @@ public int getTotalNumberOfWorkers() { | |
return nWorkers; | ||
} | ||
|
||
@Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal | ||
public int getNumberOfTasks() { | ||
return _taskQueue.size(); | ||
} | ||
|
||
private ExecutorService createExecutorService() { | ||
/** | ||
* Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a | ||
* {@link LinkedBlockingQueue}. | ||
* | ||
* @return | ||
*/ | ||
@Experimental | ||
protected BlockingQueue<Runnable> createBlockingQueue() { | ||
return new LinkedBlockingQueue<>(); | ||
} | ||
|
||
/** | ||
* Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The | ||
* default implementation creates a thread pool with a {@link LinkedBlockingQueue}. | ||
* | ||
* The thread pool should be configured to terminate idle threads after a period of time (default: 60s) | ||
* | ||
* @param nWorkers the number of workers in the thread pool | ||
* @param name the base name for threads in the pool | ||
* @return | ||
*/ | ||
@Experimental | ||
protected ExecutorService createExecutorService(int nWorkers, String name) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you intend to override this with your own implementation in your code base, or is it something that you're going to override somewhere in RDF4J? I could see that it would be useful to specify a different queue, like the PriorityBlockingQueue, to optimise the order of the operations. Could it make sense to have the BlockingQueue be a parameter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the end we want to have the priority / fair based implementation directly in RDF4J. This partial "extension point" is mostly to allow us to work asychronously of the RDF4J release, i.e. we do implementation and testing as extension in our code-base according to our sprint planning, and once ready contribute the implementation back to RDF4J (tentatively for RDF4J 5.2) This approach makes it much easier for testing. We also developed the left bind join implementation using this approach There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can mark it as experimental so that you're able to modify it later on if needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion 👍 , this gives us a bit more freedom. Have added the annotation. |
||
|
||
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue, | ||
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue, | ||
new NamingThreadFactory(name)); | ||
executor.allowCoreThreadTimeOut(true); | ||
return executor; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/******************************************************************************* | ||
* Copyright (c) 2024 Eclipse RDF4J contributors. | ||
* | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Distribution License v1.0 | ||
* which accompanies this distribution, and is available at | ||
* http://www.eclipse.org/org/documents/edl-v10.php. | ||
* | ||
* SPDX-License-Identifier: BSD-3-Clause | ||
*******************************************************************************/ | ||
package org.eclipse.rdf4j.federated.evaluation.concurrent; | ||
|
||
import org.eclipse.rdf4j.federated.FederationContext; | ||
import org.eclipse.rdf4j.query.BindingSet; | ||
|
||
/** | ||
* The default {@link SchedulerFactory} | ||
*/ | ||
public class DefaultSchedulerFactory implements SchedulerFactory { | ||
|
||
public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory(); | ||
|
||
@Override | ||
public ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, | ||
int nWorkers) { | ||
return new ControlledWorkerScheduler<>(nWorkers, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this where you're going to return other schedulers in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes exactly, either in this default factory or in a different factory |
||
"Join Scheduler"); | ||
} | ||
|
||
@Override | ||
public ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, | ||
int nWorkers) { | ||
return new ControlledWorkerScheduler<>(nWorkers, | ||
"Union Scheduler"); | ||
} | ||
|
||
@Override | ||
public ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, | ||
int nWorkers) { | ||
return new ControlledWorkerScheduler<>(nWorkers, | ||
"Left Join Scheduler"); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/******************************************************************************* | ||
* Copyright (c) 2024 Eclipse RDF4J contributors. | ||
* | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Distribution License v1.0 | ||
* which accompanies this distribution, and is available at | ||
* http://www.eclipse.org/org/documents/edl-v10.php. | ||
* | ||
* SPDX-License-Identifier: BSD-3-Clause | ||
*******************************************************************************/ | ||
package org.eclipse.rdf4j.federated.evaluation.concurrent; | ||
|
||
import org.eclipse.rdf4j.federated.FederationContext; | ||
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; | ||
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin; | ||
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBindLeftJoinTask; | ||
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask; | ||
import org.eclipse.rdf4j.query.BindingSet; | ||
|
||
/** | ||
* Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background | ||
* | ||
* @see DefaultSchedulerFactory | ||
* @author Andreas Schwarte | ||
*/ | ||
public interface SchedulerFactory { | ||
|
||
/** | ||
* Create a {@link ControlledWorkerScheduler} for regular joins (e.g., the sub-queries generated as part of bind | ||
* joins) | ||
* | ||
* @param federationContext | ||
* @param nWorkers | ||
* @return | ||
* @see ControlledWorkerBindJoin | ||
* @see ParallelBoundJoinTask | ||
*/ | ||
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume that this is for inner join. Could you add something to the javadocs explaining what it's used for? Eg. Mostly for joining statement patterns in the query. Similarly for the left join that it's typically for handling OPTIONAL clauses in the query. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good suggestions, added some details |
||
|
||
/** | ||
* Create a {@link ControlledWorkerScheduler} for unions (e.g., for executing UNION operands in parallel) | ||
* | ||
* @param federationContext | ||
* @param nWorkers | ||
* @return | ||
*/ | ||
ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, int nWorkers); | ||
|
||
/** | ||
* Create a {@link ControlledWorkerScheduler} for left joins (e.g., the sub-queries generated as part of left bind | ||
* joins, i.e. OPTIONAL) | ||
* | ||
* @param federationContext | ||
* @param nWorkers | ||
* @return | ||
* @see ControlledWorkerBindLeftJoin | ||
* @see ParallelBindLeftJoinTask | ||
*/ | ||
ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, int nWorkers); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it's in the reset method. Could we have an assert to check that the existing joinScheduler is either null or shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line above (see 126,
joinScheduler.abort();
makes sure to terminate the thread pool, if it is active. Shouldn't this avoid the assertion?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. Must have overlooked that. Will the abort always wait until all threads are stopped?