Skip to content

Commit

Permalink
Add InputStreamIterator
Browse files Browse the repository at this point in the history
This iterator wrapps an InputStream and reads chunks
from it returning the chunks in next() if i has more chunks.

I added two constructors. One that takes the chunk size for bytes
and one that uses DataSize. With DataSize bytes or kiloBytes
only should be used, since too big chunks would defeat the purpose
of this iterator. Its better to read it fully to a byte[].
DataSize uses long, and thus numbers bigger that Integer.MAX will
result in a failing iterator.
  • Loading branch information
eivinhb committed Jun 13, 2024
1 parent 65db4df commit 7059df4
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 0 deletions.
113 changes: 113 additions & 0 deletions src/main/java/no/digipost/io/InputStreamIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (C) Posten Norge AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package no.digipost.io;

import no.digipost.DiggBase;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* InputStreamIterator is an {@link Iterator} reading from an {@link InputStream} in chunks
* where each chunk is returned as the next element in the iterable.
* When the input stream is fully consumed the iterator has no more elements.
*/
public class InputStreamIterator implements Iterator<byte[]> {
private final InputStream inputStream;
private final int chunkSize;
private byte[] next;
private Boolean hasNext;
private boolean endOfStreamReached = false;

/**
* @param inputStream The input stream to iterate over
* @param chunkSize DataSize should not be too big since that defeats the purpose of this iterator.
*/
public InputStreamIterator(InputStream inputStream, DataSize chunkSize) {
this.inputStream = inputStream;
this.chunkSize = (int) chunkSize.toBytes();
}

public InputStreamIterator(InputStream inputStream, int chunkSizeBytes) {
this.inputStream = inputStream;
this.chunkSize = chunkSizeBytes;
}

private byte[] loadNextChunk() {
if (endOfStreamReached) return null;

byte[] chunk = new byte[chunkSize];
int bytesRead = 0;
try {
bytesRead = inputStream.read(chunk);
if (bytesRead == -1) {
endOfStreamReached = true;
inputStream.close();
return null;
}
} catch (IOException e) {
throw new WrappedInputStreamFailed(e, inputStream);
}

if (bytesRead < chunkSize) {
// resize the buffer if less data was read
byte[] smallerBuffer = new byte[bytesRead];
System.arraycopy(chunk, 0, smallerBuffer, 0, bytesRead);
chunk = smallerBuffer;
}

return chunk;
}

/**
* If the iterator fails reading from the wrapped InputStream an
* {@link InputStreamIterator.WrappedInputStreamFailed} runtime exception is thrown.
*
* @return true if the iteration has more elements
*/
@Override
public boolean hasNext() {
if (hasNext == null) {
next = loadNextChunk();
hasNext = (next != null);
}

return hasNext;
}

@Override
public byte[] next() {
if (!hasNext()) {
throw new NoSuchElementException("No more data to read");
}

byte[] result = next;
hasNext = null;
next = null;
return result;
}

public static final class WrappedInputStreamFailed extends RuntimeException {
private static final long serialVersionUID = 1L;

private WrappedInputStreamFailed(Throwable cause, InputStream inputStream) {
super("The InputStream " + DiggBase.friendlyName(inputStream.getClass()) +
" read failed. Cause: " + cause.getClass() + ": " + cause.getMessage(), cause);
}
}
}
108 changes: 108 additions & 0 deletions src/test/java/no/digipost/io/InputStreamIteratorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (C) Posten Norge AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package no.digipost.io;

import no.digipost.io.InputStreamIterator.WrappedInputStreamFailed;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static uk.co.probablyfine.matchers.Java8Matchers.where;

class InputStreamIteratorTest {

@Test
void should_read_the_input_stream_fully() throws Exception {
StringBuilder sb = new StringBuilder();

try (final ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(StandardCharsets.UTF_8));) {
InputStreamIterator iterator = new InputStreamIterator(inputStream, 2);

while (iterator.hasNext()) {
sb.append(new String(iterator.next()));
}
}

assertEquals("Some data", sb.toString());
}

@Test
void should_read_the_input_stream_fully_with_datasize() throws Exception {
StringBuilder sb = new StringBuilder();

try (final ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(StandardCharsets.UTF_8));) {

InputStreamIterator iterator = new InputStreamIterator(inputStream, DataSize.bytes(2));
while (iterator.hasNext()) {
sb.append(new String(iterator.next()));
}
}

assertEquals("Some data", sb.toString());
}

@Test
void too_big_data_size_will_throw_NegativeArraySizeException() throws Exception {
try (final ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(StandardCharsets.UTF_8))) {
InputStreamIterator iterator = new InputStreamIterator(inputStream, DataSize.MAX);

assertThrows(NegativeArraySizeException.class, iterator::hasNext);
}
}

@Test
void should_throw_if_next_is_called_with_no_more_elements() throws Exception {
StringBuilder sb = new StringBuilder();

try (final ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(StandardCharsets.UTF_8));) {

InputStreamIterator iterator = new InputStreamIterator(inputStream, 2);

while (iterator.hasNext()) {
sb.append(new String(iterator.next()));
}

assertThrows(NoSuchElementException.class, iterator::next);
}

assertEquals("Some data", sb.toString());
}

@Test
void should_throw_exception_if_input_stream_fails() throws Exception {
try (final InputStream failingInputStream = new InputStream() {

@Override
public int read() throws IOException {
throw new IOException("This input stream is broken");
}
}) {
InputStreamIterator iterator = new InputStreamIterator(failingInputStream, 1);

final WrappedInputStreamFailed ex = assertThrows(WrappedInputStreamFailed.class, iterator::next);
assertThat(ex, where(Exception::getMessage, containsString("InputStreamIteratorTest.")));
}

}
}

0 comments on commit 7059df4

Please sign in to comment.