Skip to content

Commit

Permalink
refactor status listener
Browse files Browse the repository at this point in the history
  • Loading branch information
osiegmar committed Aug 19, 2023
1 parent 03070df commit 271ef27
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 61 deletions.
12 changes: 6 additions & 6 deletions src/main/java/de/siegmar/fastcsv/reader/ChannelStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ final class ChannelStream {

private final ByteBuffer byteBuf = ByteBuffer.allocateDirect(8192);
private final ReadableByteChannel channel;
private final StatusConsumer statusConsumer;
private int totalPosition;
private final StatusListener statusListener;
private long totalPosition;
private int nextByte;

// Keep one buf as Buffer to maintain Android compatibility
// otherwise calls to clear() and flip() cause NoSuchMethodError
// see https://www.morling.dev/blog/bytebuffer-and-the-dreaded-nosuchmethoderror/
private final Buffer buf = byteBuf;

ChannelStream(final ReadableByteChannel channel, final StatusConsumer statusConsumer)
ChannelStream(final ReadableByteChannel channel, final StatusListener statusListener)
throws IOException {

this.channel = channel;
this.statusConsumer = statusConsumer;
this.statusListener = statusListener;
nextByte = loadData() ? byteBuf.get() : -1;
}

Expand Down Expand Up @@ -51,7 +51,7 @@ private boolean loadData() throws IOException {
if (readCnt == -1) {
return false;
}
statusConsumer.addReadBytes(readCnt);
statusListener.readBytes(readCnt);
return true;
}

Expand All @@ -64,7 +64,7 @@ private int fetchNextByte() throws IOException {
return byteBuf.get();
}

int getTotalPosition() {
long getTotalPosition() {
return totalPosition;
}

Expand Down
26 changes: 17 additions & 9 deletions src/main/java/de/siegmar/fastcsv/reader/CsvScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.function.Consumer;

final class CsvScanner {

Expand All @@ -12,27 +13,29 @@ final class CsvScanner {
private final byte quoteCharacter;
private final CommentStrategy commentStrategy;
private final byte commentCharacter;
private final StatusConsumer statusConsumer;
private final Consumer<Long> positionConsumer;
private final StatusListener statusListener;
private final ChannelStream buf;

CsvScanner(final ReadableByteChannel channel, final byte fieldSeparator, final byte quoteCharacter,
final CommentStrategy commentStrategy, final byte commentCharacter,
final StatusConsumer statusConsumer) throws IOException {
final Consumer<Long> positionConsumer, final StatusListener statusListener) throws IOException {

this.fieldSeparator = fieldSeparator;
this.quoteCharacter = quoteCharacter;
this.commentStrategy = commentStrategy;
this.commentCharacter = commentCharacter;
this.statusConsumer = statusConsumer;
this.positionConsumer = positionConsumer;
this.statusListener = statusListener;

buf = new ChannelStream(channel, statusConsumer);
buf = new ChannelStream(channel, statusListener);
}

@SuppressWarnings({"PMD.AssignmentInOperand", "checkstyle:CyclomaticComplexity",
"checkstyle:NestedIfDepth"})
void scan() throws IOException {
if (buf.peek() != -1) {
statusConsumer.addRowPosition(0);
addRowPosition(0);
}

int d;
Expand Down Expand Up @@ -64,18 +67,23 @@ private void consumeCommentedRow() throws IOException {
}

if (buf.hasData()) {
statusConsumer.addRowPosition(buf.getTotalPosition());
addRowPosition(buf.getTotalPosition());
}
break;
} else if (d == LF) {
if (buf.hasData()) {
statusConsumer.addRowPosition(buf.getTotalPosition());
addRowPosition(buf.getTotalPosition());
}
break;
}
}
}

private void addRowPosition(final long totalPosition) {
positionConsumer.accept(totalPosition);
statusListener.readRow();
}

@SuppressWarnings("PMD.AssignmentInOperand")
private boolean consumeQuotedField() throws IOException {
int d;
Expand Down Expand Up @@ -104,12 +112,12 @@ private boolean consumeUnquotedField(int d) throws IOException {
}

if (buf.hasData()) {
statusConsumer.addRowPosition(buf.getTotalPosition());
addRowPosition(buf.getTotalPosition());
}
return true;
} else if (d == LF) {
if (buf.hasData()) {
statusConsumer.addRowPosition(buf.getTotalPosition());
addRowPosition(buf.getTotalPosition());
}
return true;
}
Expand Down
33 changes: 6 additions & 27 deletions src/main/java/de/siegmar/fastcsv/reader/IndexedCsvReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@
@SuppressWarnings({"checkstyle:ClassFanOutComplexity", "checkstyle:ClassDataAbstractionCoupling"})
public final class IndexedCsvReader implements Closeable {

private final List<Integer> positions = Collections.synchronizedList(new ArrayList<>());
private final List<Long> positions = Collections.synchronizedList(new ArrayList<>());
private final Path file;
private final Charset charset;
private final char fieldSeparator;
private final char quoteCharacter;
private final CommentStrategy commentStrategy;
private final char commentCharacter;
private final StatusListener statusListener;

private final StatusConsumerImpl statusConsumer;
private final CompletableFuture<Void> scanner;
private final RandomAccessFile raf;
private final RowReader rowReader;
Expand All @@ -71,16 +68,13 @@ public final class IndexedCsvReader implements Closeable {
this.quoteCharacter = quoteCharacter;
this.commentStrategy = commentStrategy;
this.commentCharacter = commentCharacter;
this.statusListener = statusListener;

statusConsumer = new StatusConsumerImpl();

statusListener.initialize(Files.size(file));

scanner = CompletableFuture.runAsync(() -> {
try (ReadableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.READ)) {
new CsvScanner(channel, (byte) fieldSeparator, (byte) quoteCharacter,
commentStrategy, (byte) commentCharacter, statusConsumer).scan();
commentStrategy, (byte) commentCharacter, positions::add, statusListener).scan();
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -127,7 +121,7 @@ public CompletableFuture<Integer> size() {

/**
* Reads a CSV row by the given row number, returning a {@link CompletableFuture} to
* allow non-blocking read. The result will be available when the requested row has been read.
* allow non-blocking read. The result will be available when the requested row has been found/indexed.
*
* @param rowNum the row number (0-based) to read from
* @return a {@link CsvRow} fetched from the specified {@code rowNum}
Expand Down Expand Up @@ -193,14 +187,14 @@ public CompletableFuture<Stream<CsvRow>> readRows(final int firstRow, final int
});
}

private void seek(final int row, final int offset) throws IOException {
private void seek(final int row, final long offset) throws IOException {
rowReader.resetBuffer(row + 1);
raf.seek(offset);
}

private CompletableFuture<Integer> getOffset(final int row) {
private CompletableFuture<Long> getOffset(final int row) {
if (row == 0) {
return CompletableFuture.completedFuture(0);
return CompletableFuture.completedFuture(0L);
}

return waitForRow(row)
Expand Down Expand Up @@ -367,21 +361,6 @@ public IndexedCsvReader build(final Path file, final Charset charset) throws IOE

}

private class StatusConsumerImpl implements StatusConsumer {

@Override
public void addRowPosition(final int position) {
positions.add(position);
statusListener.readRow();
}

@Override
public void addReadBytes(final int readCnt) {
statusListener.readBytes(readCnt);
}

}

private static class ChannelInputStream extends InputStream {

private final RandomAccessFile raf;
Expand Down
11 changes: 0 additions & 11 deletions src/main/java/de/siegmar/fastcsv/reader/StatusConsumer.java

This file was deleted.

9 changes: 1 addition & 8 deletions src/test/java/de/siegmar/fastcsv/reader/CsvScannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,9 @@ private static List<Integer> scan(final String line, final String newLine) {
private static List<Integer> scan(final byte[] data) {
final List<Integer> positions = new ArrayList<>();

final StatusConsumer statusConsumer = new StatusConsumer() {
@Override
public void addRowPosition(final int position) {
positions.add(position);
}
};

try {
new CsvScanner(Channels.newChannel(new ByteArrayInputStream(data)), (byte) ',', (byte) '"',
CommentStrategy.READ, (byte) '#', statusConsumer).scan();
CommentStrategy.READ, (byte) '#', p -> positions.add(p.intValue()), new StatusListener() { }).scan();
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
Expand Down

0 comments on commit 271ef27

Please sign in to comment.