From 271ef277b89c713315a4badc2c25446b6da2277f Mon Sep 17 00:00:00 2001 From: Oliver Siegmar Date: Sat, 19 Aug 2023 08:04:20 +0200 Subject: [PATCH] refactor status listener --- .../siegmar/fastcsv/reader/ChannelStream.java | 12 +++---- .../de/siegmar/fastcsv/reader/CsvScanner.java | 26 ++++++++++----- .../fastcsv/reader/IndexedCsvReader.java | 33 ++++--------------- .../fastcsv/reader/StatusConsumer.java | 11 ------- .../fastcsv/reader/CsvScannerTest.java | 9 +---- 5 files changed, 30 insertions(+), 61 deletions(-) delete mode 100644 src/main/java/de/siegmar/fastcsv/reader/StatusConsumer.java diff --git a/src/main/java/de/siegmar/fastcsv/reader/ChannelStream.java b/src/main/java/de/siegmar/fastcsv/reader/ChannelStream.java index 7d755f6a..91586dc0 100644 --- a/src/main/java/de/siegmar/fastcsv/reader/ChannelStream.java +++ b/src/main/java/de/siegmar/fastcsv/reader/ChannelStream.java @@ -9,8 +9,8 @@ final class ChannelStream { private final ByteBuffer byteBuf = ByteBuffer.allocateDirect(8192); private final ReadableByteChannel channel; - private final StatusConsumer statusConsumer; - private int totalPosition; + private final StatusListener statusListener; + private long totalPosition; private int nextByte; // Keep one buf as Buffer to maintain Android compatibility @@ -18,11 +18,11 @@ final class ChannelStream { // see https://www.morling.dev/blog/bytebuffer-and-the-dreaded-nosuchmethoderror/ private final Buffer buf = byteBuf; - ChannelStream(final ReadableByteChannel channel, final StatusConsumer statusConsumer) + ChannelStream(final ReadableByteChannel channel, final StatusListener statusListener) throws IOException { this.channel = channel; - this.statusConsumer = statusConsumer; + this.statusListener = statusListener; nextByte = loadData() ? byteBuf.get() : -1; } @@ -51,7 +51,7 @@ private boolean loadData() throws IOException { if (readCnt == -1) { return false; } - statusConsumer.addReadBytes(readCnt); + statusListener.readBytes(readCnt); return true; } @@ -64,7 +64,7 @@ private int fetchNextByte() throws IOException { return byteBuf.get(); } - int getTotalPosition() { + long getTotalPosition() { return totalPosition; } diff --git a/src/main/java/de/siegmar/fastcsv/reader/CsvScanner.java b/src/main/java/de/siegmar/fastcsv/reader/CsvScanner.java index 1e3ebf17..36d9af1a 100644 --- a/src/main/java/de/siegmar/fastcsv/reader/CsvScanner.java +++ b/src/main/java/de/siegmar/fastcsv/reader/CsvScanner.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.nio.channels.ReadableByteChannel; +import java.util.function.Consumer; final class CsvScanner { @@ -12,27 +13,29 @@ final class CsvScanner { private final byte quoteCharacter; private final CommentStrategy commentStrategy; private final byte commentCharacter; - private final StatusConsumer statusConsumer; + private final Consumer positionConsumer; + private final StatusListener statusListener; private final ChannelStream buf; CsvScanner(final ReadableByteChannel channel, final byte fieldSeparator, final byte quoteCharacter, final CommentStrategy commentStrategy, final byte commentCharacter, - final StatusConsumer statusConsumer) throws IOException { + final Consumer positionConsumer, final StatusListener statusListener) throws IOException { this.fieldSeparator = fieldSeparator; this.quoteCharacter = quoteCharacter; this.commentStrategy = commentStrategy; this.commentCharacter = commentCharacter; - this.statusConsumer = statusConsumer; + this.positionConsumer = positionConsumer; + this.statusListener = statusListener; - buf = new ChannelStream(channel, statusConsumer); + buf = new ChannelStream(channel, statusListener); } @SuppressWarnings({"PMD.AssignmentInOperand", "checkstyle:CyclomaticComplexity", "checkstyle:NestedIfDepth"}) void scan() throws IOException { if (buf.peek() != -1) { - statusConsumer.addRowPosition(0); + addRowPosition(0); } int d; @@ -64,18 +67,23 @@ private void consumeCommentedRow() throws IOException { } if (buf.hasData()) { - statusConsumer.addRowPosition(buf.getTotalPosition()); + addRowPosition(buf.getTotalPosition()); } break; } else if (d == LF) { if (buf.hasData()) { - statusConsumer.addRowPosition(buf.getTotalPosition()); + addRowPosition(buf.getTotalPosition()); } break; } } } + private void addRowPosition(final long totalPosition) { + positionConsumer.accept(totalPosition); + statusListener.readRow(); + } + @SuppressWarnings("PMD.AssignmentInOperand") private boolean consumeQuotedField() throws IOException { int d; @@ -104,12 +112,12 @@ private boolean consumeUnquotedField(int d) throws IOException { } if (buf.hasData()) { - statusConsumer.addRowPosition(buf.getTotalPosition()); + addRowPosition(buf.getTotalPosition()); } return true; } else if (d == LF) { if (buf.hasData()) { - statusConsumer.addRowPosition(buf.getTotalPosition()); + addRowPosition(buf.getTotalPosition()); } return true; } diff --git a/src/main/java/de/siegmar/fastcsv/reader/IndexedCsvReader.java b/src/main/java/de/siegmar/fastcsv/reader/IndexedCsvReader.java index 1ca95ab3..51122876 100644 --- a/src/main/java/de/siegmar/fastcsv/reader/IndexedCsvReader.java +++ b/src/main/java/de/siegmar/fastcsv/reader/IndexedCsvReader.java @@ -39,16 +39,13 @@ @SuppressWarnings({"checkstyle:ClassFanOutComplexity", "checkstyle:ClassDataAbstractionCoupling"}) public final class IndexedCsvReader implements Closeable { - private final List positions = Collections.synchronizedList(new ArrayList<>()); + private final List positions = Collections.synchronizedList(new ArrayList<>()); private final Path file; private final Charset charset; private final char fieldSeparator; private final char quoteCharacter; private final CommentStrategy commentStrategy; private final char commentCharacter; - private final StatusListener statusListener; - - private final StatusConsumerImpl statusConsumer; private final CompletableFuture scanner; private final RandomAccessFile raf; private final RowReader rowReader; @@ -71,16 +68,13 @@ public final class IndexedCsvReader implements Closeable { this.quoteCharacter = quoteCharacter; this.commentStrategy = commentStrategy; this.commentCharacter = commentCharacter; - this.statusListener = statusListener; - - statusConsumer = new StatusConsumerImpl(); statusListener.initialize(Files.size(file)); scanner = CompletableFuture.runAsync(() -> { try (ReadableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.READ)) { new CsvScanner(channel, (byte) fieldSeparator, (byte) quoteCharacter, - commentStrategy, (byte) commentCharacter, statusConsumer).scan(); + commentStrategy, (byte) commentCharacter, positions::add, statusListener).scan(); } catch (final IOException e) { throw new UncheckedIOException(e); } @@ -127,7 +121,7 @@ public CompletableFuture size() { /** * Reads a CSV row by the given row number, returning a {@link CompletableFuture} to - * allow non-blocking read. The result will be available when the requested row has been read. + * allow non-blocking read. The result will be available when the requested row has been found/indexed. * * @param rowNum the row number (0-based) to read from * @return a {@link CsvRow} fetched from the specified {@code rowNum} @@ -193,14 +187,14 @@ public CompletableFuture> readRows(final int firstRow, final int }); } - private void seek(final int row, final int offset) throws IOException { + private void seek(final int row, final long offset) throws IOException { rowReader.resetBuffer(row + 1); raf.seek(offset); } - private CompletableFuture getOffset(final int row) { + private CompletableFuture getOffset(final int row) { if (row == 0) { - return CompletableFuture.completedFuture(0); + return CompletableFuture.completedFuture(0L); } return waitForRow(row) @@ -367,21 +361,6 @@ public IndexedCsvReader build(final Path file, final Charset charset) throws IOE } - private class StatusConsumerImpl implements StatusConsumer { - - @Override - public void addRowPosition(final int position) { - positions.add(position); - statusListener.readRow(); - } - - @Override - public void addReadBytes(final int readCnt) { - statusListener.readBytes(readCnt); - } - - } - private static class ChannelInputStream extends InputStream { private final RandomAccessFile raf; diff --git a/src/main/java/de/siegmar/fastcsv/reader/StatusConsumer.java b/src/main/java/de/siegmar/fastcsv/reader/StatusConsumer.java deleted file mode 100644 index 3ab776f2..00000000 --- a/src/main/java/de/siegmar/fastcsv/reader/StatusConsumer.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.siegmar.fastcsv.reader; - -interface StatusConsumer { - - default void addRowPosition(int position) { - } - - default void addReadBytes(int readCnt) { - } - -} diff --git a/src/test/java/de/siegmar/fastcsv/reader/CsvScannerTest.java b/src/test/java/de/siegmar/fastcsv/reader/CsvScannerTest.java index 276419e4..6ccf3329 100644 --- a/src/test/java/de/siegmar/fastcsv/reader/CsvScannerTest.java +++ b/src/test/java/de/siegmar/fastcsv/reader/CsvScannerTest.java @@ -104,16 +104,9 @@ private static List scan(final String line, final String newLine) { private static List scan(final byte[] data) { final List positions = new ArrayList<>(); - final StatusConsumer statusConsumer = new StatusConsumer() { - @Override - public void addRowPosition(final int position) { - positions.add(position); - } - }; - try { new CsvScanner(Channels.newChannel(new ByteArrayInputStream(data)), (byte) ',', (byte) '"', - CommentStrategy.READ, (byte) '#', statusConsumer).scan(); + CommentStrategy.READ, (byte) '#', p -> positions.add(p.intValue()), new StatusListener() { }).scan(); } catch (final IOException e) { throw new UncheckedIOException(e); }