Skip to content

Commit

Permalink
ca. 1 800 000 statements per second
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Jan 19, 2025
1 parent b9c4d91 commit ec5a712
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 35 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ data
wikidata
qendpoint-store/wdbench-indexes
wdbench-results
testing
indexing
wdbench-indexes
6 changes: 3 additions & 3 deletions qendpoint-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
<source>19</source>
<target>19</target>
</configuration>
</plugin>
</plugins>
Expand Down Expand Up @@ -75,7 +75,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
<version>1.27.1</version>
</dependency>
<dependency>
<groupId>org.apache.jena</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.the_qa_company.qendpoint.core.util.listener.IntermediateListener;
import com.the_qa_company.qendpoint.core.util.string.ByteString;
import com.the_qa_company.qendpoint.core.util.string.CompactString;
import org.apache.jena.base.Sys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,6 +52,7 @@ public class SectionCompressor implements KWayMerger.KWayMergerImpl<TripleString
private final int k;
private final boolean debugSleepKwayDict;
private final boolean quads;
private final long start = System.currentTimeMillis();

public SectionCompressor(CloseSuppressPath baseFileName, AsyncIteratorFetcher<TripleString> source,
MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict,
Expand Down Expand Up @@ -250,7 +252,10 @@ public void createChunk(SizeFetcher<TripleString> fetcher, CloseSuppressPath out
}

if (tripleID % 100_000 == 0) {
listener.notifyProgress(10, "reading triples " + tripleID);
// use start to measure how many triples are read per second
int triplesPerSecond = (int) (tripleID / ((System.currentTimeMillis() - start) / 1000.0));

listener.notifyProgress(10, "reading triples " + tripleID + " triples per second: " + triplesPerSecond);
}
// too much ram allowed?
if (subjects.size() == Integer.MAX_VALUE - 6) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.the_qa_company.qendpoint.core.iterator.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
Expand All @@ -15,6 +17,7 @@ public class AsyncIteratorFetcher<E> implements Supplier<E> {
private final Iterator<E> iterator;
private final Lock lock = new ReentrantLock();
private boolean end;
ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<>();

public AsyncIteratorFetcher(Iterator<E> iterator) {
this.iterator = iterator;
Expand All @@ -25,16 +28,33 @@ public AsyncIteratorFetcher(Iterator<E> iterator) {
*/
@Override
public E get() {
lock.lock();
try {
if (iterator.hasNext()) {
return iterator.next();
E poll = queue.poll();

if (poll != null) {
return poll;
}

synchronized (this) {
poll = queue.poll();
if (poll == null) {
if (iterator.hasNext()) {
poll = iterator.next();
}
ArrayList<E> objects = new ArrayList<>(128);

for (int i = 0; i < 128 && iterator.hasNext(); i++) {
objects.add(iterator.next());
}

queue.addAll(objects);
}

if (poll == null) {
end = true;
}
end = true;
return null;
} finally {
lock.unlock();
return poll;
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -110,14 +112,26 @@ public T get() {
}
}

private final ArrayBlockingQueue<QueueObject<T>> queue = new ArrayBlockingQueue<>(16);
private final ArrayBlockingQueue<QueueObject<T>>[] queue = new ArrayBlockingQueue[] {
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024) };

private T next;
private boolean end;
private PipedIteratorException exception;

private Thread thread;

AtomicInteger indexHasNext = new AtomicInteger(0);

volatile ArrayBlockingQueue<QueueObject<T>> focusQueue;

@Override
public boolean hasNext() {
if (end) {
Expand All @@ -129,7 +143,33 @@ public boolean hasNext() {

QueueObject<T> obj;
try {
obj = queue.take();
var focusQueue = this.focusQueue;
if (focusQueue != null) {
QueueObject<T> poll = focusQueue.poll(1, TimeUnit.MILLISECONDS);
if (poll != null) {
obj = poll;
} else {
obj = null;
this.focusQueue = null;
}
} else {
obj = null;
}

if (obj == null) {

int i = Thread.currentThread().hashCode();
obj = queue[i % queue.length].poll(10, java.util.concurrent.TimeUnit.MILLISECONDS);
while (obj == null) {
for (ArrayBlockingQueue<QueueObject<T>> queueObjects : queue) {
obj = queueObjects.poll(1, TimeUnit.MILLISECONDS);
if (obj != null) {
break;
}
}
}
}

} catch (InterruptedException e) {
throw new PipedIteratorException("Can't read pipe", e);
}
Expand Down Expand Up @@ -162,15 +202,19 @@ public void closePipe() {
public void closePipe(Throwable e) {
if (e != null) {
// clear the queue to force the exception
queue.clear();
for (ArrayBlockingQueue<QueueObject<T>> queueObjects : queue) {
queueObjects.clear();
}
if (e instanceof PipedIteratorException) {
this.exception = (PipedIteratorException) e;
} else {
this.exception = new PipedIteratorException("closing exception", e);
}
}
try {
queue.put(new EndQueueObject());
for (ArrayBlockingQueue<QueueObject<T>> queueObjects : queue) {
queueObjects.put(new EndQueueObject());
}
} catch (InterruptedException ee) {
throw new PipedIteratorException("Can't close pipe", ee);
}
Expand Down Expand Up @@ -198,9 +242,25 @@ public <E> Iterator<E> mapWithId(MapIterator.MapWithIdFunction<T, E> mappingFunc
return new MapIterator<>(this, mappingFunction);
}

AtomicInteger index = new AtomicInteger(0);

public void addElement(T node) {
int i = Thread.currentThread().hashCode();
int l = i % queue.length;
try {
queue.put(new ElementQueueObject(node));
boolean success = queue[l].offer(new ElementQueueObject(node), 10, TimeUnit.MILLISECONDS);
if (!success) {
focusQueue = queue[l];
while (!success) {
for (ArrayBlockingQueue<QueueObject<T>> queueObjects : queue) {
success = queueObjects.offer(new ElementQueueObject(node), 1, TimeUnit.MILLISECONDS);
if (success) {
break;
}
}
}
}

} catch (InterruptedException ee) {
throw new PipedIteratorException("Can't add element to pipe", ee);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.the_qa_company.qendpoint.core.rdf.parsers;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;

public class ConcurrentInputStream {

private final InputStream source;
private final int numberOfStreams;

private PipedInputStream[] pipedInputStreams;
private PipedOutputStream[] pipedOutputStreams;

private PipedInputStream bnodeInputStream;
private PipedOutputStream bnodeOutputStream;

private Thread readerThread;

public ConcurrentInputStream(InputStream stream, int numberOfStreams) {
this.source = stream;
this.numberOfStreams = numberOfStreams;
setupPipes();
startReadingThread();
}

private void setupPipes() {
pipedInputStreams = new PipedInputStream[numberOfStreams];
pipedOutputStreams = new PipedOutputStream[numberOfStreams];

try {
// Set up main fan-out pipes
for (int i = 0; i < numberOfStreams; i++) {
pipedOutputStreams[i] = new PipedOutputStream();
pipedInputStreams[i] = new PipedInputStream(pipedOutputStreams[i], 131072 * 1024);
}

// Set up bnode pipe
bnodeOutputStream = new PipedOutputStream();
bnodeInputStream = new PipedInputStream(bnodeOutputStream, 131072 * 1024);

} catch (IOException e) {
throw new RuntimeException("Error creating pipes", e);
}
}

private void startReadingThread() {
readerThread = new Thread(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(source, StandardCharsets.UTF_8))) {
String line;
int currentStreamIndex = 0;
long lineCount = 0;
long start = System.currentTimeMillis();
while ((line = reader.readLine()) != null) {
// lineCount++;
// if (lineCount == 1000000) {
// long end = System.currentTimeMillis();
// long duration = end - start;
// // print lines per second
// System.out.println(String.format("ConcurrentInputStream lines per second: %,d",
// ((int) Math.floor(lineCount / (duration / 1000.0)))));
// start = end;
// lineCount = 0;
// }

byte[] data = (line + "\n").getBytes(StandardCharsets.UTF_8);

if (line.contains("_:")) {
// Write to bnodeOutputStream only
bnodeOutputStream.write(data);
} else {
// Write to a single stream from pipedOutputStreams in a
// round-robin manner
pipedOutputStreams[currentStreamIndex].write(data);
currentStreamIndex = (currentStreamIndex + 1) % pipedOutputStreams.length;
}
}
} catch (IOException e) {
// If there's a read error, close everything.
} finally {
// Close all output streams to signal EOF
for (PipedOutputStream out : pipedOutputStreams) {
try {
out.close();
} catch (IOException ignored) {
}
}

try {
bnodeOutputStream.close();
} catch (IOException ignored) {
}
}
});

readerThread.setName("ConcurrentInputStream reader");
readerThread.setDaemon(true);
readerThread.start();
}

/**
* Returns the stream for blank-node lines only.
*/
public InputStream getBnodeStream() {
return bnodeInputStream;
}

/**
* Returns the array of InputStreams that share all concurrently read data.
*/
public InputStream[] getStreams() {
return pipedInputStreams;
}
}
Loading

0 comments on commit ec5a712

Please sign in to comment.