diff --git a/CHANGELOG.md b/CHANGELOG.md index a232fa236..57a1aebbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## [Unreleased] ## [5.0.0] - 2021-01-20 +### Added +* added an asynchronous large file upload method `B2StorageClient.storeLargeFileFromLocalContentAsync` ### Changed `[Incompatible]` * Disable automatic decompressing compressed content in HTTP client download library diff --git a/core/src/main/java/com/backblaze/b2/client/B2AlreadyStoredPartStorer.java b/core/src/main/java/com/backblaze/b2/client/B2AlreadyStoredPartStorer.java index 724404d80..c39ff1a8a 100644 --- a/core/src/main/java/com/backblaze/b2/client/B2AlreadyStoredPartStorer.java +++ b/core/src/main/java/com/backblaze/b2/client/B2AlreadyStoredPartStorer.java @@ -40,7 +40,8 @@ public int getPartNumber() { @Override public B2Part storePart( B2LargeFileStorer largeFileCreationManager, - B2UploadListener uploadListener) { + B2UploadListener uploadListener, + B2CancellationToken cancellationToken) { largeFileCreationManager.updateProgress( uploadListener, diff --git a/core/src/main/java/com/backblaze/b2/client/B2CancellableContentSource.java b/core/src/main/java/com/backblaze/b2/client/B2CancellableContentSource.java new file mode 100644 index 000000000..4cf8a71ea --- /dev/null +++ b/core/src/main/java/com/backblaze/b2/client/B2CancellableContentSource.java @@ -0,0 +1,120 @@ +/* + * Copyright 2021, Backblaze Inc. All Rights Reserved. + * License https://www.backblaze.com/using_b2_code.html + */ +package com.backblaze.b2.client; + +import com.backblaze.b2.client.contentSources.B2ContentSource; +import com.backblaze.b2.client.exceptions.B2Exception; + +import java.io.IOException; +import java.io.InputStream; + +/** + * A ContentSource wrapper that returns an InputStream that will check whether + * the cancellation token says to stop. + */ +public class B2CancellableContentSource implements B2ContentSource { + private final B2ContentSource source; + private final B2CancellationToken cancellationToken; + + public B2CancellableContentSource(B2ContentSource source, B2CancellationToken cancellationToken) { + this.source = source; + this.cancellationToken = cancellationToken; + } + + @Override + public long getContentLength() throws IOException { + return source.getContentLength(); + } + + @Override + public String getSha1OrNull() throws IOException { + return source.getSha1OrNull(); + } + + @Override + public Long getSrcLastModifiedMillisOrNull() throws IOException { + return source.getSrcLastModifiedMillisOrNull(); + } + + @Override + public InputStream createInputStream() throws IOException, B2Exception { + return new CancellableInputStream(source.createInputStream(), cancellationToken); + } + + private static class CancellableInputStream extends InputStream { + private final InputStream source; + private final B2CancellationToken cancellationToken; + + public CancellableInputStream(InputStream source, B2CancellationToken cancellationToken) { + this.source = source; + this.cancellationToken = cancellationToken; + } + + private void throwIfCancelled() throws IOException { + if (cancellationToken.isCancelled()) { + throw new IOException("Request was cancelled by caller"); + } + } + + @Override + public int read() throws IOException { + throwIfCancelled(); + + return source.read(); + } + + @Override + public int read(byte[] b) throws IOException { + throwIfCancelled(); + + return source.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + throwIfCancelled(); + + return source.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + throwIfCancelled(); + + return source.skip(n); + } + + @Override + public int available() throws IOException { + throwIfCancelled(); + + return source.available(); + } + + @Override + public void close() throws IOException { + throwIfCancelled(); + + source.close(); + } + + @Override + public synchronized void mark(int readlimit) { + source.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + throwIfCancelled(); + + source.reset(); + } + + @Override + public boolean markSupported() { + return source.markSupported(); + } + } +} diff --git a/core/src/main/java/com/backblaze/b2/client/B2CancellationToken.java b/core/src/main/java/com/backblaze/b2/client/B2CancellationToken.java new file mode 100644 index 000000000..dce0f93ea --- /dev/null +++ b/core/src/main/java/com/backblaze/b2/client/B2CancellationToken.java @@ -0,0 +1,46 @@ +/* + * Copyright 2021, Backblaze Inc. All Rights Reserved. + * License https://www.backblaze.com/using_b2_code.html + */ +package com.backblaze.b2.client; + +import com.backblaze.b2.client.exceptions.B2Exception; +import com.backblaze.b2.client.exceptions.B2LocalException; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Token to pass around whether the caller has cancelled an operation so that + * sub tasks can stop their processing. + */ +class B2CancellationToken { + /** + * The actual cancellation state + */ + private AtomicBoolean cancelled = new AtomicBoolean(false); + + /** + * Throws an exception if cancelled + * @throws B2Exception + */ + void throwIfCancelled() throws B2Exception { + if (cancelled.get()) { + throw new B2LocalException("cancelled", "Request cancelled by caller"); + } + } + + /** + * Sets the cancelled state + */ + void cancel() { + cancelled.set(true); + } + + /** + * Check if the state is cancelled + * @return whether cancel has been called already + */ + boolean isCancelled() { + return cancelled.get(); + } +} diff --git a/core/src/main/java/com/backblaze/b2/client/B2CopyingPartStorer.java b/core/src/main/java/com/backblaze/b2/client/B2CopyingPartStorer.java index dfe6aadc4..f7a0bb03d 100644 --- a/core/src/main/java/com/backblaze/b2/client/B2CopyingPartStorer.java +++ b/core/src/main/java/com/backblaze/b2/client/B2CopyingPartStorer.java @@ -6,10 +6,8 @@ import com.backblaze.b2.client.exceptions.B2CannotComputeException; import com.backblaze.b2.client.exceptions.B2Exception; -import com.backblaze.b2.client.exceptions.B2LocalException; import com.backblaze.b2.client.structures.B2Part; import com.backblaze.b2.client.structures.B2UploadListener; -import com.backblaze.b2.client.structures.B2UploadProgress; import com.backblaze.b2.util.B2ByteRange; import java.util.Objects; @@ -47,9 +45,10 @@ public long getPartSizeOrThrow() throws B2CannotComputeException { @Override public B2Part storePart( B2LargeFileStorer largeFileCreationManager, - B2UploadListener uploadListener) throws B2Exception { + B2UploadListener uploadListener, + B2CancellationToken cancellationToken) throws B2Exception { - return largeFileCreationManager.copyPart(partNumber, sourceFileId, byteRangeOrNull, uploadListener); + return largeFileCreationManager.copyPart(partNumber, sourceFileId, byteRangeOrNull, uploadListener, cancellationToken); } @Override diff --git a/core/src/main/java/com/backblaze/b2/client/B2LargeFileStorer.java b/core/src/main/java/com/backblaze/b2/client/B2LargeFileStorer.java index 79cfd0941..2be1c14a7 100644 --- a/core/src/main/java/com/backblaze/b2/client/B2LargeFileStorer.java +++ b/core/src/main/java/com/backblaze/b2/client/B2LargeFileStorer.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -55,6 +57,11 @@ public class B2LargeFileStorer { */ private final List startingBytePositions; + /** + * The cancellation token used to abort uploads in progress. + */ + private final B2CancellationToken cancellationToken = new B2CancellationToken(); + private final B2AccountAuthorizationCache accountAuthCache; private final B2UploadPartUrlCache uploadPartUrlCache; private final B2StorageClientWebifier webifier; @@ -170,6 +177,35 @@ public static B2LargeFileStorer forLocalContent( } B2FileVersion storeFile(B2UploadListener uploadListenerOrNull) throws B2Exception { + try { + return storeFileAsync(uploadListenerOrNull).get(); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + if (cause instanceof B2Exception) { + throw (B2Exception) cause; + } else { + throw new B2LocalException("trouble", "exception while trying to upload parts: " + cause, cause); + } + } catch (InterruptedException e) { + throw new B2LocalException("trouble", "interrupted exception"); + } + } + + /** + * Stores the file asynchronously and returns a CompletableFuture to manage the task. + * + * The returned future can be cancelled, and that will attempt to abort any already started + * uploads by causing them to fail. + * + * Cancelling after the b2_finish_large_file API call has been started will result in the + * future being cancelled, but the API call can still succeed. There is no way to tell from + * the future whether this is the case. The caller is responsible for checking and calling + * B2StorageClient.cancelLargeFile. + * + * @param uploadListenerOrNull upload listener + * @return CompletableFuture that returns the finished file's B2FileVersion + */ + CompletableFuture storeFileAsync(B2UploadListener uploadListenerOrNull) { final B2UploadListener uploadListener; if (uploadListenerOrNull == null) { uploadListener = B2UploadListener.noopListener(); @@ -177,19 +213,98 @@ B2FileVersion storeFile(B2UploadListener uploadListenerOrNull) throws B2Exceptio uploadListener = uploadListenerOrNull; } - final List> partFutures = new ArrayList<>(); + final List> completableFutures = new ArrayList<>(); // Store each part in parallel. for (final B2PartStorer partStorer : partStorers) { - partFutures.add(executor.submit(() -> partStorer.storePart(this, uploadListener))); + CompletableFuture future = CompletableFuture.supplyAsync( + adaptB2Supplier(() -> partStorer.storePart(this, uploadListener, cancellationToken)), + executor); + + completableFutures.add(future); + } + + // future that tracks when all the parts are stored + final CompletableFuture allPartsCompletedFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); + + final List> partFutures = new ArrayList<>(completableFutures); + + // this is the future to return to the caller + final CompletableFuture retval = allPartsCompletedFuture + .thenApplyAsync((voidParam) -> finishLargeFileFromB2PartFuturesInCompletionStage(fileVersion, partFutures), executor); + + // The caller can call cancel on the future that we give them, but that will only + // stop futures chained to the end of this future from running; it does not stop + // processing the parts that may be uploading currently. So we add our own handler + // to detect this and cancel any remaining part uploads so they don't start, and + // flag the cancellation token to try to fail any in-progress uploads. + retval.whenComplete((result, error) -> { + if (error != null) { + completableFutures.forEach(x -> x.cancel(true)); + cancellationToken.cancel(); + } + }); + + return retval; + } + + /** + * Adapts finishLargeFileFromB2PartFutures to be used in completion stages. These + * functions cannot return B2Exceptions, so those must be caught here and converted + * to CompletionExceptions. + * + * @param largeFileVersion + * @param partFutures + * @return + */ + private B2FileVersion finishLargeFileFromB2PartFuturesInCompletionStage(B2FileVersion largeFileVersion, + List> partFutures) { + return callSupplierAndConvertErrorsForCompletableFutures( + () -> finishLargeFileFromB2PartFutures(largeFileVersion, partFutures)); + } + + /** + * Supplier interface that throws B2Exception or IOException + * @param return type + */ + private interface B2Supplier { + Type get() throws B2Exception, IOException; + } + + /** + * Calls the supplier and converts B2Exception or IOException into CompletionException to be + * suitable for use in CompletionStages + * @param supplier supplier function can throw B2Exception or IOException + * @param type the supplier returns + * @return result of the supplier function + */ + Type callSupplierAndConvertErrorsForCompletableFutures(B2Supplier supplier) { + try { + return supplier.get(); + } catch (IOException | B2Exception error) { + throw new CompletionException(error); } + } - return finishLargeFileFromB2PartFutures(fileVersion, partFutures); + /** + * + * @param supplier converts a supplier that can throw B2Exception or IOException into a + * Supplier instance that does not; these exceptions will be converted into + * CompletionExceptions instead. + * + * The resulting supplier is suitable to use in CompletionStages + * @param return type of the supplier + * @return supplier + */ + private Supplier adaptB2Supplier(B2Supplier supplier) { + return () -> callSupplierAndConvertErrorsForCompletableFutures(supplier); } private B2FileVersion finishLargeFileFromB2PartFutures(B2FileVersion largeFileVersion, List> partFutures) throws B2Exception { + cancellationToken.throwIfCancelled(); + final List partSha1s = new ArrayList<>(); try { for (final Future partFuture : partFutures) { @@ -250,7 +365,7 @@ void updateProgress( B2Part uploadPart( int partNumber, B2ContentSource contentSource, - B2UploadListener uploadListener) throws IOException, B2Exception { + B2UploadListener uploadListener, B2CancellationToken cancellationToken) throws IOException, B2Exception { updateProgress( uploadListener, @@ -273,6 +388,7 @@ B2Part uploadPart( "b2_upload_part", accountAuthCache, (isRetry) -> { + cancellationToken.throwIfCancelled(); final B2UploadPartUrlResponse uploadPartUrlResponse = uploadPartUrlCache.get(isRetry); final B2ContentSource contentSourceThatReportsProgress = @@ -329,7 +445,8 @@ B2Part copyPart( int partNumber, String sourceFileId, B2ByteRange byteRangeOrNull, - B2UploadListener uploadListener) throws B2Exception { + B2UploadListener uploadListener, + B2CancellationToken cancellationToken) throws B2Exception { updateProgress( uploadListener, @@ -348,6 +465,8 @@ B2Part copyPart( "b2_copy_part", accountAuthCache, () -> { + cancellationToken.throwIfCancelled(); + updateProgress( uploadListener, partNumber, diff --git a/core/src/main/java/com/backblaze/b2/client/B2PartStorer.java b/core/src/main/java/com/backblaze/b2/client/B2PartStorer.java index 36e362eeb..9ae2dd129 100644 --- a/core/src/main/java/com/backblaze/b2/client/B2PartStorer.java +++ b/core/src/main/java/com/backblaze/b2/client/B2PartStorer.java @@ -6,7 +6,6 @@ import com.backblaze.b2.client.exceptions.B2CannotComputeException; import com.backblaze.b2.client.exceptions.B2Exception; -import com.backblaze.b2.client.exceptions.B2LocalException; import com.backblaze.b2.client.structures.B2Part; import com.backblaze.b2.client.structures.B2UploadListener; @@ -35,11 +34,13 @@ public interface B2PartStorer { * @param largeFileCreationManager The object managing the storage of the whole * large file. * @param uploadListener The listener that tracks upload progress events. + * @param cancellationToken token to check whether the action has been cancelled * @return The part that is stored, if successful. * @throws B2Exception if there's trouble. */ B2Part storePart( B2LargeFileStorer largeFileCreationManager, - B2UploadListener uploadListener) throws IOException, B2Exception; + B2UploadListener uploadListener, + B2CancellationToken cancellationToken) throws IOException, B2Exception; } diff --git a/core/src/main/java/com/backblaze/b2/client/B2StorageClient.java b/core/src/main/java/com/backblaze/b2/client/B2StorageClient.java index 083ea8e93..9f918d3b6 100644 --- a/core/src/main/java/com/backblaze/b2/client/B2StorageClient.java +++ b/core/src/main/java/com/backblaze/b2/client/B2StorageClient.java @@ -46,6 +46,7 @@ import java.io.Closeable; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; /***** @@ -264,6 +265,39 @@ B2FileVersion storeLargeFileFromLocalContent( B2UploadListener uploadListenerOrNull, ExecutorService executor) throws B2Exception; + /** + * Initiates uploading the specified content source as separate parts to form a + * B2 large file. This allows the upload to be cancelled partway through. + * + * This method assumes you have already called startLargeFile(). The return value + * of that call needs to be passed into this method. + * + * The returned future can be cancelled and this will also attempt to stop any part + * uploads in progress. + * + * Cancelling after the b2_finish_large_file API call has been started will result in the + * future being cancelled, but the API call can still succeed. There is no way to tell from + * the future whether this is the case. The caller is responsible for checking and calling + * B2StorageClient.cancelLargeFile. + * + * @param fileVersion The B2FileVersion for the large file getting stored. + * This is the return value of startLargeFile(). + * @param contentSource The contentSource to upload. + * @param uploadListenerOrNull The object that handles upload progress events. + * This may be null if you do not need to be notified + * of progress events. + * @param executor The executor for uploading parts in parallel. The caller + * retains ownership of the executor and is responsible for + * shutting it down. + * @return CompletableFuture with the resulting B2FileVersion of the completed file + * @throws B2Exception on error + */ + CompletableFuture storeLargeFileFromLocalContentAsync( + B2FileVersion fileVersion, + B2ContentSource contentSource, + B2UploadListener uploadListenerOrNull, + ExecutorService executor) throws B2Exception; + /** * Stores a large file, where storing each part may involve different behavior * or byte sources. diff --git a/core/src/main/java/com/backblaze/b2/client/B2StorageClientImpl.java b/core/src/main/java/com/backblaze/b2/client/B2StorageClientImpl.java index 143809ad2..92517e604 100644 --- a/core/src/main/java/com/backblaze/b2/client/B2StorageClientImpl.java +++ b/core/src/main/java/com/backblaze/b2/client/B2StorageClientImpl.java @@ -57,6 +57,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -249,6 +250,26 @@ public B2FileVersion storeLargeFileFromLocalContent( executor).storeFile(uploadListener); } + @Override + public CompletableFuture storeLargeFileFromLocalContentAsync( + B2FileVersion fileVersion, + B2ContentSource contentSource, + B2UploadListener uploadListenerOrNull, + ExecutorService executor) throws B2Exception { + + final B2LargeFileStorer storer = B2LargeFileStorer.forLocalContent( + fileVersion, + contentSource, + getPartSizes(), + accountAuthCache, + webifier, + retryer, + retryPolicySupplier, + executor); + + return storer.storeFileAsync(uploadListenerOrNull); + } + @Override public B2FileVersion storeLargeFile( B2FileVersion fileVersion, diff --git a/core/src/main/java/com/backblaze/b2/client/B2UploadingPartStorer.java b/core/src/main/java/com/backblaze/b2/client/B2UploadingPartStorer.java index 2db69466e..7cb08be07 100644 --- a/core/src/main/java/com/backblaze/b2/client/B2UploadingPartStorer.java +++ b/core/src/main/java/com/backblaze/b2/client/B2UploadingPartStorer.java @@ -12,7 +12,6 @@ import java.io.IOException; import java.util.Objects; - /** * This implementation stores a part of a large file by uploading * the bytes from a B2ContentSource. @@ -44,9 +43,11 @@ public long getPartSizeOrThrow() throws B2CannotComputeException { @Override public B2Part storePart( B2LargeFileStorer largeFileCreationManager, - B2UploadListener uploadListener) throws IOException, B2Exception { + B2UploadListener uploadListener, + B2CancellationToken cancellationToken) throws IOException, B2Exception { - return largeFileCreationManager.uploadPart(partNumber, contentSource, uploadListener); + final B2CancellableContentSource cancellableContentSource = new B2CancellableContentSource(contentSource, cancellationToken); + return largeFileCreationManager.uploadPart(partNumber, cancellableContentSource, uploadListener, cancellationToken); } @Override diff --git a/core/src/test/java/com/backblaze/b2/client/B2AlreadyStoredPartStorerTest.java b/core/src/test/java/com/backblaze/b2/client/B2AlreadyStoredPartStorerTest.java index ead801044..9da341dc4 100644 --- a/core/src/test/java/com/backblaze/b2/client/B2AlreadyStoredPartStorerTest.java +++ b/core/src/test/java/com/backblaze/b2/client/B2AlreadyStoredPartStorerTest.java @@ -23,6 +23,7 @@ public class B2AlreadyStoredPartStorerTest extends B2BaseTest { public void testStorePart() { final B2Part part = new B2Part(fileId(1), 2, 10000000, makeSha1(1), makeMd5(2), 1111); final B2AlreadyStoredPartStorer partStorer = new B2AlreadyStoredPartStorer(part); - assertEquals(part, partStorer.storePart(mock(B2LargeFileStorer.class), uploadListener)); + final B2CancellationToken cancellationToken = new B2CancellationToken(); + assertEquals(part, partStorer.storePart(mock(B2LargeFileStorer.class), uploadListener, cancellationToken)); } } diff --git a/core/src/test/java/com/backblaze/b2/client/B2CancellableContentSourceTest.java b/core/src/test/java/com/backblaze/b2/client/B2CancellableContentSourceTest.java new file mode 100644 index 000000000..4e7ad7f32 --- /dev/null +++ b/core/src/test/java/com/backblaze/b2/client/B2CancellableContentSourceTest.java @@ -0,0 +1,167 @@ +/* + * Copyright 2021, Backblaze Inc. All Rights Reserved. + * License https://www.backblaze.com/using_b2_code.html + */ +package com.backblaze.b2.client; + +import com.backblaze.b2.client.contentSources.B2ContentSource; +import com.backblaze.b2.client.exceptions.B2Exception; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.io.InputStream; + +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class B2CancellableContentSourceTest { + + final InputStream mockInputStream = mock(InputStream.class); + final B2ContentSource mockContentSource = mock(B2ContentSource.class); + final B2CancellationToken cancellationToken = new B2CancellationToken(); + + final B2CancellableContentSource cancellableContentSource = new B2CancellableContentSource(mockContentSource, cancellationToken); + + final long EXPECTED_CONTENT_LENGTH = 100; + final String EXPECTED_SHA1 = "0000000000000000000000000000000000000000"; + final Long EXPECTED_SRC_MODIFIED = 123L; + final int EXPECTED_READ_RESULT = 1; + final long EXPECTED_SKIP_RESULT = 2; + final boolean EXPECTED_MARK_SUPPORTED_RESULT = true; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void setup() throws B2Exception, IOException { + when(mockContentSource.createInputStream()).thenReturn(mockInputStream); + + // set return values + when(mockContentSource.getContentLength()).thenReturn(EXPECTED_CONTENT_LENGTH); + when(mockContentSource.getSha1OrNull()).thenReturn(EXPECTED_SHA1); + when(mockContentSource.getSrcLastModifiedMillisOrNull()).thenReturn(EXPECTED_SRC_MODIFIED); + + when(mockInputStream.read()).thenReturn(EXPECTED_READ_RESULT); + when(mockInputStream.read(anyObject())).thenReturn(EXPECTED_READ_RESULT); + when(mockInputStream.read(anyObject(), anyInt(), anyInt())).thenReturn(EXPECTED_READ_RESULT); + + when(mockInputStream.skip(anyLong())).thenReturn(EXPECTED_SKIP_RESULT); + when(mockInputStream.markSupported()).thenReturn(EXPECTED_MARK_SUPPORTED_RESULT); + } + + @Test + public void testPassthroughMethods() throws IOException { + Assert.assertEquals(EXPECTED_CONTENT_LENGTH, cancellableContentSource.getContentLength()); + verify(mockContentSource).getContentLength(); + + Assert.assertEquals(EXPECTED_SHA1, cancellableContentSource.getSha1OrNull()); + verify(mockContentSource).getSha1OrNull(); + + Assert.assertEquals(EXPECTED_SRC_MODIFIED, cancellableContentSource.getSrcLastModifiedMillisOrNull()); + verify(mockContentSource).getSrcLastModifiedMillisOrNull(); + } + + @Test + public void testInputStreamPassthrough() throws B2Exception, IOException { + final InputStream inputStream = cancellableContentSource.createInputStream(); + + Assert.assertEquals(EXPECTED_READ_RESULT, inputStream.read()); + verify(mockInputStream).read(); + + final byte[] bytes = new byte[10]; + Assert.assertEquals(EXPECTED_READ_RESULT, inputStream.read(bytes)); + verify(mockInputStream).read(eq(bytes)); + + Assert.assertEquals(EXPECTED_READ_RESULT, inputStream.read(bytes, 0, 10)); + verify(mockInputStream).read(eq(bytes), eq(0), eq(10)); + + Assert.assertEquals(EXPECTED_SKIP_RESULT, inputStream.skip(1)); + verify(mockInputStream).skip(1); + + inputStream.close(); + verify(mockInputStream).close(); + + inputStream.mark(2); + verify(mockInputStream).mark(2); + + inputStream.reset(); + verify(mockInputStream).reset(); + + Assert.assertEquals(EXPECTED_MARK_SUPPORTED_RESULT, inputStream.markSupported()); + verify(mockInputStream).markSupported(); + } + + @Test + public void testInputStreamReadCancelled() throws B2Exception, IOException { + final InputStream inputStream = cancellableContentSource.createInputStream(); + cancellationToken.cancel(); + + thrown.expect(IOException.class); + inputStream.read(); + } + + @Test + public void testInputStreamRead2Cancelled() throws B2Exception, IOException { + final InputStream inputStream = cancellableContentSource.createInputStream(); + cancellationToken.cancel(); + + thrown.expect(IOException.class); + inputStream.read(new byte[1]); + } + + @Test + public void testInputStreamRead3Cancelled() throws B2Exception, IOException { + final InputStream inputStream = cancellableContentSource.createInputStream(); + cancellationToken.cancel(); + + thrown.expect(IOException.class); + inputStream.read(new byte[10], 0, 10); + } + + @Test + public void testInputStreamSkipCancelled() throws B2Exception, IOException { + final InputStream inputStream = cancellableContentSource.createInputStream(); + cancellationToken.cancel(); + + thrown.expect(IOException.class); + inputStream.skip(1); + } + + @Test + public void testInputStreamAvailableCancelled() throws B2Exception, IOException { + final InputStream inputStream = cancellableContentSource.createInputStream(); + cancellationToken.cancel(); + + thrown.expect(IOException.class); + inputStream.available(); + } + + @Test + public void testInputStreamCloseCancelled() throws B2Exception, IOException { + final InputStream inputStream = cancellableContentSource.createInputStream(); + cancellationToken.cancel(); + + thrown.expect(IOException.class); + inputStream.close(); + } + + @Test + public void testInputStreamResetCancelled() throws B2Exception, IOException { + final InputStream inputStream = cancellableContentSource.createInputStream(); + cancellationToken.cancel(); + + thrown.expect(IOException.class); + inputStream.reset(); + } +} + + diff --git a/core/src/test/java/com/backblaze/b2/client/B2CancellationTokenTest.java b/core/src/test/java/com/backblaze/b2/client/B2CancellationTokenTest.java new file mode 100644 index 000000000..e6c8fd5ce --- /dev/null +++ b/core/src/test/java/com/backblaze/b2/client/B2CancellationTokenTest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021, Backblaze Inc. All Rights Reserved. + * License https://www.backblaze.com/using_b2_code.html + */ +package com.backblaze.b2.client; + +import com.backblaze.b2.client.exceptions.B2Exception; +import com.backblaze.b2.client.exceptions.B2LocalException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Tests for B2CancellationToken + */ +public class B2CancellationTokenTest { + private final B2CancellationToken cancellationToken = new B2CancellationToken(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testThrowIfCancelledDoesThrow() throws B2Exception { + cancellationToken.cancel(); + + thrown.expect(B2LocalException.class); + cancellationToken.throwIfCancelled(); + } + + @Test + public void testThrowIfCancelledDoesNotThrow() throws B2Exception { + cancellationToken.throwIfCancelled(); + } + + @Test + public void testCancel() { + // initial state + Assert.assertFalse(cancellationToken.isCancelled()); + + cancellationToken.cancel(); + + Assert.assertTrue(cancellationToken.isCancelled()); + + // can cancel again + cancellationToken.cancel(); + + Assert.assertTrue(cancellationToken.isCancelled()); + } +} diff --git a/core/src/test/java/com/backblaze/b2/client/B2CopyingPartStorerTest.java b/core/src/test/java/com/backblaze/b2/client/B2CopyingPartStorerTest.java index 3f8def0a7..7566d4bb2 100644 --- a/core/src/test/java/com/backblaze/b2/client/B2CopyingPartStorerTest.java +++ b/core/src/test/java/com/backblaze/b2/client/B2CopyingPartStorerTest.java @@ -36,23 +36,25 @@ public class B2CopyingPartStorerTest extends B2BaseTest { private final B2UploadListener uploadListener = mock(B2UploadListener.class); public B2CopyingPartStorerTest() throws B2Exception { - when(largeFileStorer.copyPart(anyInt(), anyString(), anyObject(), anyObject())).thenReturn(part); + when(largeFileStorer.copyPart(anyInt(), anyString(), anyObject(), anyObject(), anyObject())).thenReturn(part); } @Test public void testStorePart_noByteRange() throws B2Exception { final B2CopyingPartStorer partStorer = new B2CopyingPartStorer(PART_NUMBER, SOURCE_FILE_ID); + final B2CancellationToken cancellationToken = new B2CancellationToken(); - assertEquals(part, partStorer.storePart(largeFileStorer, uploadListener)); - verify(largeFileStorer).copyPart(2, SOURCE_FILE_ID, null, uploadListener); + assertEquals(part, partStorer.storePart(largeFileStorer, uploadListener, cancellationToken)); + verify(largeFileStorer).copyPart(2, SOURCE_FILE_ID, null, uploadListener, cancellationToken); } @Test public void testStorePart_byteRange() throws B2Exception { final B2ByteRange byteRange = B2ByteRange.between(1000000, 2000000); final B2CopyingPartStorer partStorer = new B2CopyingPartStorer(2, SOURCE_FILE_ID, byteRange); + final B2CancellationToken cancellationToken = new B2CancellationToken(); - assertEquals(part, partStorer.storePart(largeFileStorer, uploadListener)); - verify(largeFileStorer).copyPart(2, SOURCE_FILE_ID, byteRange, uploadListener); + assertEquals(part, partStorer.storePart(largeFileStorer, uploadListener, cancellationToken)); + verify(largeFileStorer).copyPart(2, SOURCE_FILE_ID, byteRange, uploadListener, cancellationToken); } } diff --git a/core/src/test/java/com/backblaze/b2/client/B2LargeFileStorerTest.java b/core/src/test/java/com/backblaze/b2/client/B2LargeFileStorerTest.java index 8b713fed5..210d2d6a1 100644 --- a/core/src/test/java/com/backblaze/b2/client/B2LargeFileStorerTest.java +++ b/core/src/test/java/com/backblaze/b2/client/B2LargeFileStorerTest.java @@ -17,6 +17,7 @@ import com.backblaze.b2.client.structures.B2UploadState; import com.backblaze.b2.util.B2BaseTest; import org.junit.After; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -28,8 +29,13 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -76,7 +82,7 @@ public class B2LargeFileStorerTest extends B2BaseTest { private final Supplier retryPolicySupplier = B2DefaultRetryPolicy.supplier(); // Use an executor that has a predictable order of events. private final ExecutorService executor = new ExecutorThatUsesMainThread(); - + private final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor(); private final B2UploadListener uploadListenerMock = mock(B2UploadListener.class); @Rule @@ -96,6 +102,7 @@ public B2LargeFileStorerTest() throws B2Exception { @After public void tearDown() { executor.shutdown(); + singleThreadedExecutor.shutdown(); } public List createB2LargeFileStorerAndGetSortedPartStorers(List outOfOrderPartStorers) { @@ -251,10 +258,46 @@ private void storeFile(B2UploadListener uploadListener) throws IOException, B2Ex largeFileStorer.storeFile(uploadListener); } + private void storeFileAsync(B2UploadListener uploadListener) throws IOException, ExecutionException, InterruptedException { + final List partStorers = new ArrayList<>(); + final B2ContentSource contentSourceForPart1 = mock(B2ContentSource.class); + when(contentSourceForPart1.getContentLength()).thenReturn(PART_SIZE_FOR_FIRST_TWO); + partStorers.add(new B2UploadingPartStorer(1, contentSourceForPart1)); + + final String copySourceFile = fileId(1); + partStorers.add(new B2CopyingPartStorer(2, copySourceFile)); + + partStorers.add(new B2AlreadyStoredPartStorer(part3)); + + final B2LargeFileStorer largeFileStorer = new B2LargeFileStorer( + largeFileVersion, + partStorers, + authCache, + webifier, + retryer, + retryPolicySupplier, + executor); + + assertEquals(partStorers, largeFileStorer.getPartStorers()); + + largeFileStorer.storeFileAsync(uploadListener).get(); + } + @Test public void testStoreFile_success() throws IOException, B2Exception { storeFile(uploadListenerMock); + verifySuccessfulUpload(); + } + + @Test + public void testStoreFileAsync_success() throws IOException, B2Exception, ExecutionException, InterruptedException { + storeFileAsync(uploadListenerMock); + + verifySuccessfulUpload(); + } + + private void verifySuccessfulUpload() throws B2Exception { // Checks for the part that is uploaded. verify(webifier).getUploadPartUrl(anyObject(), anyObject()); verify(webifier).uploadPart(anyObject(), anyObject()); @@ -310,47 +353,116 @@ public void testStoreFile_cannotUpload() throws B2Exception { storeFile(uploadListenerMock); fail("should have thrown"); } catch (B2InternalErrorException e) { - // Make sure retries work - verify(webifier, times(8)).uploadPart(anyObject(), anyObject()); + checkCannotUpload(); + } catch (Exception e) { + fail("should have thrown B2InternalErrorException"); + } + } + @Test + public void testStoreFileAsync_cannotUpload() throws B2Exception { + when(webifier.uploadPart(anyObject(), anyObject())).thenThrow(new B2InternalErrorException("error")); + + try { + storeFileAsync(uploadListenerMock); + fail("should have thrown"); + } catch (ExecutionException e) { + assertEquals(B2InternalErrorException.class, e.getCause().getClass()); + + checkCannotUpload(); + } catch (Exception e) { + fail("should have thrown ExecutionException"); + } + } + + private void checkCannotUpload() throws B2Exception { + + // Make sure retries work + verify(webifier, times(8)).uploadPart(anyObject(), anyObject()); + verify(webifier, times(0)).finishLargeFile(anyObject(), anyObject()); + + // Make sure progress events are as expected. + // There should be 2 WAITING_TO_START from the uploading and copying part storers + // There should be 2 STARTING from the uploading and copying part storers + // There should be 1 FAILED (for the part that uploads) + // There should be 2 SUCCEEDED (one for each remaining part) + // Things to note: for copies we don't really know the number of bytes that will be copied. Even if a byte range + // is supplied in the copy, the range may exceed the file's size, and so it may get clamped during the actual + // copy operation. We use 1 byte as the placeholder until the copy succeeds, then use the result to update the + // real part size. + verify(uploadListenerMock).progress( + new B2UploadProgress(0, 3, 0, PART_SIZE_FOR_FIRST_TWO, 0, B2UploadState.WAITING_TO_START)); + verify(uploadListenerMock, times(8)).progress( + new B2UploadProgress(0, 3, 0, PART_SIZE_FOR_FIRST_TWO, 0, B2UploadState.STARTING)); + verify(uploadListenerMock).progress( + new B2UploadProgress(0, 3, 0, PART_SIZE_FOR_FIRST_TWO, 0, B2UploadState.FAILED)); + verify(uploadListenerMock).progress( + new B2UploadProgress(1, 3, PART_SIZE_FOR_FIRST_TWO, 1, 0, B2UploadState.WAITING_TO_START)); + verify(uploadListenerMock).progress( + new B2UploadProgress(1, 3, PART_SIZE_FOR_FIRST_TWO, 1, 0, B2UploadState.STARTING)); + verify(uploadListenerMock).progress( + new B2UploadProgress( + 1, + 3, + PART_SIZE_FOR_FIRST_TWO, + PART_SIZE_FOR_FIRST_TWO, + PART_SIZE_FOR_FIRST_TWO, + B2UploadState.SUCCEEDED)); + verify(uploadListenerMock).progress( + new B2UploadProgress( + 2, + 3, + B2UploadProgress.UNKNOWN_PART_START_BYTE, + LAST_PART_SIZE, + LAST_PART_SIZE, + B2UploadState.SUCCEEDED)); + } + + @Test + public void testStoreFileAsyncCancelled() throws B2Exception, IOException, ExecutionException, InterruptedException { + final List partStorers = new ArrayList<>(); + final B2ContentSource contentSourceForPart1 = mock(B2ContentSource.class); + final B2ContentSource contentSourceForPart2 = mock(B2ContentSource.class); + + final AtomicReference> future = new AtomicReference<>(); + + when(contentSourceForPart1.getContentLength()).thenReturn(PART_SIZE_FOR_FIRST_TWO); + when(contentSourceForPart2.getContentLength()).thenReturn(PART_SIZE_FOR_FIRST_TWO); + + // when the first call to uploadPart is called, we will cancel the request + when(webifier.uploadPart(any(), any())).thenAnswer(invocation -> { + future.get().cancel(true); + return null; + }); + + partStorers.add(new B2UploadingPartStorer(1, contentSourceForPart1)); + partStorers.add(new B2UploadingPartStorer(2, contentSourceForPart2)); + partStorers.add(new B2AlreadyStoredPartStorer(part3)); + + final B2LargeFileStorer largeFileStorer = new B2LargeFileStorer( + largeFileVersion, + partStorers, + authCache, + webifier, + retryer, + retryPolicySupplier, + singleThreadedExecutor); + + assertEquals(partStorers, largeFileStorer.getPartStorers()); + + future.set(largeFileStorer.storeFileAsync(uploadListenerMock)); + + try { + + future.get().get(); + Assert.fail("we should have gotten a CancellationException"); + } catch (CancellationException e) { + + // upload part should only be called once + verify(webifier, times(1)).uploadPart(anyObject(), anyObject()); verify(webifier, times(0)).finishLargeFile(anyObject(), anyObject()); - // Make sure progress events are as expected. - // There should be 2 WAITING_TO_START from the uploading and copying part storers - // There should be 2 STARTING from the uploading and copying part storers - // There should be 1 FAILED (for the part that uploads) - // There should be 2 SUCCEEDED (one for each remaining part) - // Things to note: for copies we don't really know the number of bytes that will be copied. Even if a byte range - // is supplied in the copy, the range may exceed the file's size, and so it may get clamped during the actual - // copy operation. We use 1 byte as the placeholder until the copy succeeds, then use the result to update the - // real part size. - verify(uploadListenerMock).progress( - new B2UploadProgress(0, 3, 0, PART_SIZE_FOR_FIRST_TWO, 0, B2UploadState.WAITING_TO_START)); - verify(uploadListenerMock, times(8)).progress( - new B2UploadProgress(0, 3, 0, PART_SIZE_FOR_FIRST_TWO, 0, B2UploadState.STARTING)); - verify(uploadListenerMock).progress( - new B2UploadProgress(0, 3, 0, PART_SIZE_FOR_FIRST_TWO, 0, B2UploadState.FAILED)); - verify(uploadListenerMock).progress( - new B2UploadProgress(1, 3, PART_SIZE_FOR_FIRST_TWO, 1, 0, B2UploadState.WAITING_TO_START)); - verify(uploadListenerMock).progress( - new B2UploadProgress(1, 3, PART_SIZE_FOR_FIRST_TWO, 1, 0, B2UploadState.STARTING)); - verify(uploadListenerMock).progress( - new B2UploadProgress( - 1, - 3, - PART_SIZE_FOR_FIRST_TWO, - PART_SIZE_FOR_FIRST_TWO, - PART_SIZE_FOR_FIRST_TWO, - B2UploadState.SUCCEEDED)); - verify(uploadListenerMock).progress( - new B2UploadProgress( - 2, - 3, - B2UploadProgress.UNKNOWN_PART_START_BYTE, - LAST_PART_SIZE, - LAST_PART_SIZE, - B2UploadState.SUCCEEDED)); } catch (Exception e) { - fail("should have thrown B2InternalErrorException"); + Assert.fail("we should have gotten a CancellationException"); } } diff --git a/core/src/test/java/com/backblaze/b2/client/B2UploadingPartStorerTest.java b/core/src/test/java/com/backblaze/b2/client/B2UploadingPartStorerTest.java index cb1c1e330..6251de172 100644 --- a/core/src/test/java/com/backblaze/b2/client/B2UploadingPartStorerTest.java +++ b/core/src/test/java/com/backblaze/b2/client/B2UploadingPartStorerTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -39,10 +40,11 @@ public void testStorePart() throws IOException, B2Exception { final B2ContentSource contentSource = mock(B2ContentSource.class); final B2UploadingPartStorer partStorer = new B2UploadingPartStorer(PART_NUMBER, contentSource); final B2LargeFileStorer largeFileStorer = mock(B2LargeFileStorer.class); + final B2CancellationToken cancellationToken = new B2CancellationToken(); - when(largeFileStorer.uploadPart(anyInt(), anyObject(), anyObject())).thenReturn(part); + when(largeFileStorer.uploadPart(anyInt(), anyObject(), anyObject(), eq(cancellationToken))).thenReturn(part); - assertEquals(part, partStorer.storePart(largeFileStorer, uploadListener)); - verify(largeFileStorer).uploadPart(2, contentSource, uploadListener); + assertEquals(part, partStorer.storePart(largeFileStorer, uploadListener, cancellationToken)); + verify(largeFileStorer).uploadPart(eq(2), anyObject(), eq(uploadListener), eq(cancellationToken)); } }