diff --git a/.gitignore b/.gitignore index deeb8432d..495a5eba9 100755 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,6 @@ data wikidata qendpoint-store/wdbench-indexes wdbench-results +testing +indexing +wdbench-indexes diff --git a/qendpoint-core/pom.xml b/qendpoint-core/pom.xml index d390abb25..a344da24a 100644 --- a/qendpoint-core/pom.xml +++ b/qendpoint-core/pom.xml @@ -17,8 +17,8 @@ org.apache.maven.plugins maven-compiler-plugin - 17 - 17 + 19 + 19 @@ -75,7 +75,7 @@ org.apache.commons commons-compress - 1.21 + 1.27.1 org.apache.jena diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java index 05b47197f..09e208473 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java @@ -18,6 +18,7 @@ import com.the_qa_company.qendpoint.core.util.listener.IntermediateListener; import com.the_qa_company.qendpoint.core.util.string.ByteString; import com.the_qa_company.qendpoint.core.util.string.CompactString; +import org.apache.jena.base.Sys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ public class SectionCompressor implements KWayMerger.KWayMergerImpl source, MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict, @@ -250,7 +252,10 @@ public void createChunk(SizeFetcher fetcher, CloseSuppressPath out } if (tripleID % 100_000 == 0) { - listener.notifyProgress(10, "reading triples " + tripleID); + // use start to measure how many triples are read per second + int triplesPerSecond = (int) (tripleID / ((System.currentTimeMillis() - start) / 1000.0)); + + listener.notifyProgress(10, "reading triples " + tripleID + " triples per second: " + triplesPerSecond); } // too much ram allowed? if (subjects.size() == Integer.MAX_VALUE - 6) { diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcher.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcher.java index 788b3ab8a..d0ddca289 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcher.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcher.java @@ -1,6 +1,8 @@ package com.the_qa_company.qendpoint.core.iterator.utils; +import java.util.ArrayList; import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -15,6 +17,7 @@ public class AsyncIteratorFetcher implements Supplier { private final Iterator iterator; private final Lock lock = new ReentrantLock(); private boolean end; + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); public AsyncIteratorFetcher(Iterator iterator) { this.iterator = iterator; @@ -25,16 +28,33 @@ public AsyncIteratorFetcher(Iterator iterator) { */ @Override public E get() { - lock.lock(); - try { - if (iterator.hasNext()) { - return iterator.next(); + E poll = queue.poll(); + + if (poll != null) { + return poll; + } + + synchronized (this) { + poll = queue.poll(); + if (poll == null) { + if (iterator.hasNext()) { + poll = iterator.next(); + } + ArrayList objects = new ArrayList<>(128); + + for (int i = 0; i < 128 && iterator.hasNext(); i++) { + objects.add(iterator.next()); + } + + queue.addAll(objects); + } + + if (poll == null) { + end = true; } - end = true; - return null; - } finally { - lock.unlock(); + return poll; } + } /** diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIterator.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIterator.java index ab9aa05a0..a6d862ccd 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIterator.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIterator.java @@ -5,6 +5,8 @@ import java.util.Iterator; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; /** @@ -110,7 +112,15 @@ public T get() { } } - private final ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(16); + private final ArrayBlockingQueue>[] queue = new ArrayBlockingQueue[] { + new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024), + new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024), + new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024), + new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024), + new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024), + new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024), + new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024), + new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024) }; private T next; private boolean end; @@ -118,6 +128,10 @@ public T get() { private Thread thread; + AtomicInteger indexHasNext = new AtomicInteger(0); + + volatile ArrayBlockingQueue> focusQueue; + @Override public boolean hasNext() { if (end) { @@ -129,7 +143,33 @@ public boolean hasNext() { QueueObject obj; try { - obj = queue.take(); + var focusQueue = this.focusQueue; + if (focusQueue != null) { + QueueObject poll = focusQueue.poll(1, TimeUnit.MILLISECONDS); + if (poll != null) { + obj = poll; + } else { + obj = null; + this.focusQueue = null; + } + } else { + obj = null; + } + + if (obj == null) { + + int i = Thread.currentThread().hashCode(); + obj = queue[i % queue.length].poll(10, java.util.concurrent.TimeUnit.MILLISECONDS); + while (obj == null) { + for (ArrayBlockingQueue> queueObjects : queue) { + obj = queueObjects.poll(1, TimeUnit.MILLISECONDS); + if (obj != null) { + break; + } + } + } + } + } catch (InterruptedException e) { throw new PipedIteratorException("Can't read pipe", e); } @@ -162,7 +202,9 @@ public void closePipe() { public void closePipe(Throwable e) { if (e != null) { // clear the queue to force the exception - queue.clear(); + for (ArrayBlockingQueue> queueObjects : queue) { + queueObjects.clear(); + } if (e instanceof PipedIteratorException) { this.exception = (PipedIteratorException) e; } else { @@ -170,7 +212,9 @@ public void closePipe(Throwable e) { } } try { - queue.put(new EndQueueObject()); + for (ArrayBlockingQueue> queueObjects : queue) { + queueObjects.put(new EndQueueObject()); + } } catch (InterruptedException ee) { throw new PipedIteratorException("Can't close pipe", ee); } @@ -198,9 +242,25 @@ public Iterator mapWithId(MapIterator.MapWithIdFunction mappingFunc return new MapIterator<>(this, mappingFunction); } + AtomicInteger index = new AtomicInteger(0); + public void addElement(T node) { + int i = Thread.currentThread().hashCode(); + int l = i % queue.length; try { - queue.put(new ElementQueueObject(node)); + boolean success = queue[l].offer(new ElementQueueObject(node), 10, TimeUnit.MILLISECONDS); + if (!success) { + focusQueue = queue[l]; + while (!success) { + for (ArrayBlockingQueue> queueObjects : queue) { + success = queueObjects.offer(new ElementQueueObject(node), 1, TimeUnit.MILLISECONDS); + if (success) { + break; + } + } + } + } + } catch (InterruptedException ee) { throw new PipedIteratorException("Can't add element to pipe", ee); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java new file mode 100644 index 000000000..b2574cef4 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java @@ -0,0 +1,118 @@ +package com.the_qa_company.qendpoint.core.rdf.parsers; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; + +public class ConcurrentInputStream { + + private final InputStream source; + private final int numberOfStreams; + + private PipedInputStream[] pipedInputStreams; + private PipedOutputStream[] pipedOutputStreams; + + private PipedInputStream bnodeInputStream; + private PipedOutputStream bnodeOutputStream; + + private Thread readerThread; + + public ConcurrentInputStream(InputStream stream, int numberOfStreams) { + this.source = stream; + this.numberOfStreams = numberOfStreams; + setupPipes(); + startReadingThread(); + } + + private void setupPipes() { + pipedInputStreams = new PipedInputStream[numberOfStreams]; + pipedOutputStreams = new PipedOutputStream[numberOfStreams]; + + try { + // Set up main fan-out pipes + for (int i = 0; i < numberOfStreams; i++) { + pipedOutputStreams[i] = new PipedOutputStream(); + pipedInputStreams[i] = new PipedInputStream(pipedOutputStreams[i], 131072 * 1024); + } + + // Set up bnode pipe + bnodeOutputStream = new PipedOutputStream(); + bnodeInputStream = new PipedInputStream(bnodeOutputStream, 131072 * 1024); + + } catch (IOException e) { + throw new RuntimeException("Error creating pipes", e); + } + } + + private void startReadingThread() { + readerThread = new Thread(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(source, StandardCharsets.UTF_8))) { + String line; + int currentStreamIndex = 0; + long lineCount = 0; + long start = System.currentTimeMillis(); + while ((line = reader.readLine()) != null) { +// lineCount++; +// if (lineCount == 1000000) { +// long end = System.currentTimeMillis(); +// long duration = end - start; +// // print lines per second +// System.out.println(String.format("ConcurrentInputStream lines per second: %,d", +// ((int) Math.floor(lineCount / (duration / 1000.0))))); +// start = end; +// lineCount = 0; +// } + + byte[] data = (line + "\n").getBytes(StandardCharsets.UTF_8); + + if (line.contains("_:")) { + // Write to bnodeOutputStream only + bnodeOutputStream.write(data); + } else { + // Write to a single stream from pipedOutputStreams in a + // round-robin manner + pipedOutputStreams[currentStreamIndex].write(data); + currentStreamIndex = (currentStreamIndex + 1) % pipedOutputStreams.length; + } + } + } catch (IOException e) { + // If there's a read error, close everything. + } finally { + // Close all output streams to signal EOF + for (PipedOutputStream out : pipedOutputStreams) { + try { + out.close(); + } catch (IOException ignored) { + } + } + + try { + bnodeOutputStream.close(); + } catch (IOException ignored) { + } + } + }); + + readerThread.setName("ConcurrentInputStream reader"); + readerThread.setDaemon(true); + readerThread.start(); + } + + /** + * Returns the stream for blank-node lines only. + */ + public InputStream getBnodeStream() { + return bnodeInputStream; + } + + /** + * Returns the array of InputStreams that share all concurrently read data. + */ + public InputStream[] getStreams() { + return pipedInputStreams; + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java index a89ec1e12..450c8ad39 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java @@ -20,6 +20,8 @@ import java.io.FileNotFoundException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; import com.the_qa_company.qendpoint.core.quad.QuadString; import org.apache.jena.graph.Triple; @@ -44,9 +46,47 @@ public class RDFParserRIOT implements RDFParserCallback { private static final Logger log = LoggerFactory.getLogger(RDFParserRIOT.class); private void parse(InputStream stream, String baseUri, Lang lang, boolean keepBNode, ElemStringBuffer buffer) { + Thread.dumpStack(); + if (keepBNode) { - RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) - .parse(buffer); + ConcurrentInputStream cs = new ConcurrentInputStream(stream, 11); + + InputStream bnodes = cs.getBnodeStream(); + + var threads = new ArrayList(); + + Thread e1 = new Thread(() -> { + RDFParser.source(bnodes).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + }); + e1.setName("BNode parser"); + threads.add(e1); + + InputStream[] streams = cs.getStreams(); + int i = 0; + for (InputStream s : streams) { + int temp = i + 1; + Thread e = new Thread(() -> { + RDFParser.source(s).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + }); + i++; + e.setName("Stream parser " + i); + threads.add(e); + + } + + threads.forEach(Thread::start); + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + +// RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) +// .parse(buffer); } else { RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer); } @@ -75,14 +115,13 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) throws ParserException { try { - ElemStringBuffer buffer = new ElemStringBuffer(callback); switch (notation) { - case NTRIPLES -> parse(input, baseUri, Lang.NTRIPLES, keepBNode, buffer); - case NQUAD -> parse(input, baseUri, Lang.NQUADS, keepBNode, buffer); - case RDFXML -> parse(input, baseUri, Lang.RDFXML, keepBNode, buffer); - case N3, TURTLE -> parse(input, baseUri, Lang.TURTLE, keepBNode, buffer); - case TRIG -> parse(input, baseUri, Lang.TRIG, keepBNode, buffer); - case TRIX -> parse(input, baseUri, Lang.TRIX, keepBNode, buffer); + case NTRIPLES -> parse(input, baseUri, Lang.NTRIPLES, keepBNode, new ElemStringBuffer(callback)); + case NQUAD -> parse(input, baseUri, Lang.NQUADS, keepBNode, new ElemStringBuffer(callback)); + case RDFXML -> parse(input, baseUri, Lang.RDFXML, keepBNode, new ElemStringBuffer(callback)); + case N3, TURTLE -> parse(input, baseUri, Lang.TURTLE, keepBNode, new ElemStringBuffer(callback)); + case TRIG -> parse(input, baseUri, Lang.TRIG, keepBNode, new ElemStringBuffer(callback)); + case TRIX -> parse(input, baseUri, Lang.TRIX, keepBNode, new ElemStringBuffer(callback)); default -> throw new NotImplementedException("Parser not found for format " + notation); } } catch (Exception e) { @@ -91,12 +130,13 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo } } - private static class ElemStringBuffer implements StreamRDF { + public static class ElemStringBuffer implements StreamRDF { private final TripleString triple = new TripleString(); private final QuadString quad = new QuadString(); private final RDFCallback callback; + private final static AtomicInteger counter = new AtomicInteger(0); - private ElemStringBuffer(RDFCallback callback) { + public ElemStringBuffer(RDFCallback callback) { this.callback = callback; } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/util/listener/MultiThreadListenerConsole.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/util/listener/MultiThreadListenerConsole.java index a6e78f838..1b28f415f 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/util/listener/MultiThreadListenerConsole.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/util/listener/MultiThreadListenerConsole.java @@ -2,6 +2,8 @@ import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import com.the_qa_company.qendpoint.core.listener.MultiThreadListener; @@ -53,11 +55,11 @@ public MultiThreadListenerConsole(boolean color) { public MultiThreadListenerConsole(boolean color, boolean asciiListener) { this.color = color || ALLOW_COLOR_SEQUENCE; - if (asciiListener) { - threadMessages = new TreeMap<>(); - } else { - threadMessages = null; - } +// if (asciiListener) { + threadMessages = new TreeMap<>(); +// } else { +// threadMessages = null; +// } } public String color(int r, int g, int b) { @@ -140,7 +142,7 @@ public synchronized void notifyProgress(String thread, float level, String messa String msg = colorReset() + progressBar(level) + colorReset() + " " + message; if (threadMessages != null) { threadMessages.put(thread, msg); - render(); +// render(); } else { System.out.println(colorReset() + "[" + colorThread() + thread + colorReset() + "]" + msg); } @@ -160,11 +162,27 @@ public void removeLast() { System.out.print(message); } + { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + Executors.newSingleThreadExecutor().submit(() -> { + while (true) { + try { + Thread.sleep(500); + render(); + } catch (InterruptedException e) { + break; + } + executorService.shutdown(); + } + }); + + } + private void render() { render(null); } - private void render(String ln) { + synchronized private void render(String ln) { if (threadMessages == null) { return; } @@ -197,5 +215,6 @@ private void render(String ln) { previous = lines; System.out.print(message); + System.out.flush(); } } diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/hdt/impl/HasmacHDTImporterTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/hdt/impl/HasmacHDTImporterTest.java new file mode 100644 index 000000000..f3262fa3f --- /dev/null +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/hdt/impl/HasmacHDTImporterTest.java @@ -0,0 +1,228 @@ +package com.the_qa_company.qendpoint.core.hdt.impl; + +import com.the_qa_company.qendpoint.core.enums.CompressionType; +import com.the_qa_company.qendpoint.core.enums.RDFNotation; +import com.the_qa_company.qendpoint.core.exceptions.ParserException; +import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIterator; +import com.the_qa_company.qendpoint.core.options.HDTSpecification; +import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; +import com.the_qa_company.qendpoint.core.rdf.RDFParserFactory; +import com.the_qa_company.qendpoint.core.rdf.parsers.ConcurrentInputStream; +import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserRIOT; +import com.the_qa_company.qendpoint.core.triples.TripleString; +import com.the_qa_company.qendpoint.core.triples.impl.utils.HDTTestUtils; +import com.the_qa_company.qendpoint.core.util.io.IOUtil; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFParser; +import org.apache.jena.riot.lang.LabelToNode; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.LongAdder; + +import static org.apache.jena.riot.lang.extra.TurtleJCC.lang; + +public class HasmacHDTImporterTest { + + private final HDTSpecification spec; + + public HasmacHDTImporterTest() { + spec = new HDTSpecification(); + spec.set("loader.type", "one-pass"); + spec.set("loader.bnode.seed", "1234567"); + } + + private Iterator asIt(String file) throws ParserException { + List triples = new ArrayList<>(); + RDFNotation notation = RDFNotation.guess(file); + RDFParserCallback parser = RDFParserFactory.getParserCallback(notation); + parser.doParse(file, HDTTestUtils.BASE_URI, notation, true, (triple, pos) -> { + // force duplication of the triple string data + triples.add(new TripleString(triple.getSubject().toString(), triple.getPredicate().toString(), + triple.getObject().toString())); + }); + return triples.iterator(); + } + + @Test + public void testGz() throws ParserException, IOException { + FileInputStream fileStream = new FileInputStream( + "/Users/havardottestad/Documents/Programming/qEndpoint2/indexing/latest-truthy.nt.gz"); + + try (InputStream uncompressed = IOUtil.asUncompressed(fileStream, CompressionType.GZIP); + BufferedReader reader = new BufferedReader(new InputStreamReader(uncompressed))) { + long sum = 0; + String line; + long startTime = System.currentTimeMillis(); + int lineCount = 0; + int checkpoint = 1000000; + + while ((line = reader.readLine()) != null) { + sum += line.length(); + lineCount++; + if (lineCount == checkpoint) { + long currentTime = System.currentTimeMillis(); + long elapsedTime = currentTime - startTime; // in + // milliseconds + int linesPerSecond = ((int) Math.floor(checkpoint / (elapsedTime / 1000.0))); + + // TODO: print linesPerSecond with thousands separator + System.out.println(String.format("Lines per second: %,d", linesPerSecond)); + + startTime = currentTime; // reset start time for the next + // checkpoint + lineCount = 0; // reset line count for the next checkpoint + } + } + System.out.println(sum); + } + } + + @Test + public void concurentInputStreamTest() throws ParserException, IOException { + try (InputStream fileStream = new FileInputStream( + "/Users/havardottestad/Documents/Programming/qEndpoint2/indexing/latest-truthy.nt.gz")) { + + InputStream uncompressed = IOUtil.asUncompressed(fileStream, CompressionType.GZIP); + + LongAdder longAdder = new LongAdder(); + + ConcurrentInputStream cs = new ConcurrentInputStream(uncompressed, 10); + + InputStream bnodes = cs.getBnodeStream(); + + var threads = new ArrayList(); + + Thread e1 = new Thread(() -> { + BufferedReader reader = new BufferedReader(new InputStreamReader(bnodes)); + while (true) { + try { + if (!(reader.readLine() != null)) + break; + } catch (IOException e) { + throw new RuntimeException(e); + } + longAdder.increment(); + } + + }); + e1.setName("BNode parser"); + threads.add(e1); + + InputStream[] streams = cs.getStreams(); + int i = 0; + for (InputStream s : streams) { + int temp = i + 1; + Thread e = new Thread(() -> { + BufferedReader reader = new BufferedReader(new InputStreamReader(s)); + while (true) { + try { + if (!(reader.readLine() != null)) + break; + } catch (IOException e2) { + throw new RuntimeException(e2); + } + longAdder.increment(); + } + }); + i++; + e.setName("Stream parser " + i); + threads.add(e); + + } + + threads.forEach(Thread::start); + + new Thread(() -> { + while (true) { + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(String.format("Lines per second: %,d", longAdder.sumThenReset() / 10)); + + } + }).start(); + + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + } + + @Test + public void concurrentParsing() throws ParserException, IOException { + try (InputStream fileStream = new FileInputStream( + "/Users/havardottestad/Documents/Programming/qEndpoint2/indexing/latest-truthy.nt.gz")) { + + InputStream uncompressed = IOUtil.asUncompressed(fileStream, CompressionType.GZIP); + + LongAdder longAdder = new LongAdder(); + + ConcurrentInputStream cs = new ConcurrentInputStream(uncompressed, 11); + + InputStream bnodes = cs.getBnodeStream(); + + var threads = new ArrayList(); + + RDFParser parser1 = RDFParser.source(bnodes).base("").lang(Lang.NTRIPLES) + .labelToNode(LabelToNode.createUseLabelAsGiven()).build(); + Thread e1 = new Thread(() -> { + parser1.parse(new RDFParserRIOT.ElemStringBuffer((triple, pos) -> longAdder.increment())); + }); + e1.setName("BNode parser"); + threads.add(e1); + + InputStream[] streams = cs.getStreams(); + int i = 0; + for (InputStream s : streams) { + int temp = i + 1; + RDFParser parser = RDFParser.source(s).base("").lang(Lang.NTRIPLES) + .labelToNode(LabelToNode.createUseLabelAsGiven()).build(); + Thread e = new Thread(() -> { + parser.parse(new RDFParserRIOT.ElemStringBuffer((triple, pos) -> longAdder.increment())); + }); + i++; + e.setName("Stream parser " + i); + threads.add(e); + + } + + threads.forEach(Thread::start); + + new Thread(() -> { + while (true) { + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(String.format("Lines per second: %,d", longAdder.sumThenReset() / 10)); + } + }).start(); + + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + } + +}