Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding storeLargeFileFromLocalContentAsync method to B2StorageClient #145

Merged
merged 7 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## [Unreleased]

## [5.0.0] - 2021-01-20
### Added
* added an asynchronous large file upload method `B2StorageClient.storeLargeFileFromLocalContentAsync`
### Changed `[Incompatible]`
* Disable automatic decompressing compressed content in HTTP client download library

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public int getPartNumber() {
@Override
public B2Part storePart(
B2LargeFileStorer largeFileCreationManager,
B2UploadListener uploadListener) {
B2UploadListener uploadListener,
B2CancellationToken cancellationToken) {

largeFileCreationManager.updateProgress(
uploadListener,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2021, Backblaze Inc. All Rights Reserved.
* License https://www.backblaze.com/using_b2_code.html
*/
package com.backblaze.b2.client;

import com.backblaze.b2.client.contentSources.B2ContentSource;
import com.backblaze.b2.client.exceptions.B2Exception;

import java.io.IOException;
import java.io.InputStream;

/**
* A ContentSource wrapper that returns an InputStream that will check whether
* the cancellation token says to stop.
*/
public class B2CancellableContentSource implements B2ContentSource {
private final B2ContentSource source;
private final B2CancellationToken cancellationToken;

public B2CancellableContentSource(B2ContentSource source, B2CancellationToken cancellationToken) {
this.source = source;
this.cancellationToken = cancellationToken;
}

@Override
public long getContentLength() throws IOException {
return source.getContentLength();
}

@Override
public String getSha1OrNull() throws IOException {
return source.getSha1OrNull();
}

@Override
public Long getSrcLastModifiedMillisOrNull() throws IOException {
return source.getSrcLastModifiedMillisOrNull();
}

@Override
public InputStream createInputStream() throws IOException, B2Exception {
return new CancellableInputStream(source.createInputStream(), cancellationToken);
}

private static class CancellableInputStream extends InputStream {
private final InputStream source;
private final B2CancellationToken cancellationToken;

public CancellableInputStream(InputStream source, B2CancellationToken cancellationToken) {
this.source = source;
this.cancellationToken = cancellationToken;
}

private void throwIfCancelled() throws IOException {
if (cancellationToken.isCancelled()) {
throw new IOException("Request was cancelled by caller");
}
}

@Override
public int read() throws IOException {
throwIfCancelled();

return source.read();
}

@Override
public int read(byte[] b) throws IOException {
throwIfCancelled();

return source.read(b);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
throwIfCancelled();

return source.read(b, off, len);
}

@Override
public long skip(long n) throws IOException {
throwIfCancelled();

return source.skip(n);
}

@Override
public int available() throws IOException {
throwIfCancelled();

return source.available();
}

@Override
public void close() throws IOException {
throwIfCancelled();

source.close();
}

@Override
public synchronized void mark(int readlimit) {
source.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
throwIfCancelled();

source.reset();
}

@Override
public boolean markSupported() {
return source.markSupported();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021, Backblaze Inc. All Rights Reserved.
* License https://www.backblaze.com/using_b2_code.html
*/
package com.backblaze.b2.client;

import com.backblaze.b2.client.exceptions.B2Exception;
import com.backblaze.b2.client.exceptions.B2LocalException;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Token to pass around whether the caller has cancelled an operation so that
* sub tasks can stop their processing.
*/
class B2CancellationToken {
/**
* The actual cancellation state
*/
private AtomicBoolean cancelled = new AtomicBoolean(false);

/**
* Throws an exception if cancelled
* @throws B2Exception
*/
void throwIfCancelled() throws B2Exception {
if (cancelled.get()) {
throw new B2LocalException("cancelled", "Request cancelled by caller");
}
}

/**
* Sets the cancelled state
*/
void cancel() {
cancelled.set(true);
}

/**
* Check if the state is cancelled
* @return whether cancel has been called already
*/
boolean isCancelled() {
return cancelled.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@

import com.backblaze.b2.client.exceptions.B2CannotComputeException;
import com.backblaze.b2.client.exceptions.B2Exception;
import com.backblaze.b2.client.exceptions.B2LocalException;
import com.backblaze.b2.client.structures.B2Part;
import com.backblaze.b2.client.structures.B2UploadListener;
import com.backblaze.b2.client.structures.B2UploadProgress;
import com.backblaze.b2.util.B2ByteRange;

import java.util.Objects;
Expand Down Expand Up @@ -47,9 +45,10 @@ public long getPartSizeOrThrow() throws B2CannotComputeException {
@Override
public B2Part storePart(
B2LargeFileStorer largeFileCreationManager,
B2UploadListener uploadListener) throws B2Exception {
B2UploadListener uploadListener,
B2CancellationToken cancellationToken) throws B2Exception {

return largeFileCreationManager.copyPart(partNumber, sourceFileId, byteRangeOrNull, uploadListener);
return largeFileCreationManager.copyPart(partNumber, sourceFileId, byteRangeOrNull, uploadListener, cancellationToken);
}

@Override
Expand Down
Loading