Skip to content

Commit

Permalink
Merging main (0.0.13) into develop. (#70)
Browse files Browse the repository at this point in the history
* Feature/headers in download request (#68)

* Tweaks to logic ensuring the buffer is filled for each read operation.
  • Loading branch information
knighto82 authored Nov 18, 2024
1 parent 0873ecb commit 46ab2d0
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 53 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The Fusion SDK is published to Maven Central and can be retrieved from there usi
<dependency>
<groupId>io.github.jpmorganchase.fusion</groupId>
<artifactId>fusion-sdk</artifactId>
<version>0.0.12-SNAPSHOT</version>
<version>0.0.13</version>
</dependency>
```

Expand Down Expand Up @@ -109,7 +109,6 @@ Fusion fusion = Fusion.builder().configuration(FusionConfiguration.builder()
* _downloadPath_ - Configures the path where distributions should be downloaded to. Defaults to "downloads"
* _singlePartUploadSizeLimit_ - Max size in MB of data allowed for a single part upload. if 32MB was the max size then 32 would be provided. Defaults to 50.
* _uploadPartSize_ - Upload part chunk size. If a value such as 8MB is required, then client would set this value to 8. Defaults to 16MB.
* _maxInFluxDataSize_ - Max in flux data to be read at a given time. Use this to protect data read to heap during upload. If a value such as 100MB is required, set to 100. Defaults to 500MB.
* _uploadThreadPoolSize_ - Size of Thread-Pool to be used for uploading chunks of a multipart file. Defaults to number of available processors.
* _downloadThreadPoolSize_ - Size of Thread-Pool to be used for uploading chunks of a multipart file. Defaults to number of available processors.
* _digestAlgorithm_ - Digest algorithm used by fusion to verify the integrity of upload/downloads. Defaults to SHA-256.
Expand Down
9 changes: 4 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<groupId>io.github.jpmorganchase.fusion</groupId>
<artifactId>fusion-sdk</artifactId>
<packaging>jar</packaging>
<version>0.0.12-SNAPSHOT</version>
<version>0.0.13</version>

<name>fusion-sdk</name>
<description>A Java SDK for the Fusion platform API</description>
Expand Down Expand Up @@ -45,7 +45,7 @@

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<id>central</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
Expand Down Expand Up @@ -397,10 +397,9 @@
<artifactId>nexus-staging-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<serverId>central</serverId>
<nexusUrl>https://s01.oss.sonatype.org/</nexusUrl>
<!-- this can be changed to true once we are comfortable with the release process being correct -->
<autoReleaseAfterClose>false</autoReleaseAfterClose>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ public class FusionConfiguration {
@Builder.Default
int uploadPartSize = 8;

/**
* Max in flux data to be read at a given time. Defaults to 500MB.
* If a value such as 1gb is required, then client would set this value to 1000;
*/
@Builder.Default
long maxInFluxDataSize = 500;

/**
* Size of Thread-Pool to be used for uploading chunks of a multipart file
* Defaults to number of available processors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import lombok.Builder;
import lombok.Getter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,12 +79,6 @@ public class FusionAPIUploadOperations implements APIUploadOperations {
*/
int uploadThreadPoolSize;

/**
* Max size of in-flux data that can be read at a given time.
* See {@link FusionConfiguration} for default values.
*/
long maxInFluxDataSize;

/**
* Call the API upload endpoint to load a distribution
*
Expand Down Expand Up @@ -224,19 +215,26 @@ protected MultipartTransferContext callAPIToInitiateMultiPartUpload(UploadReques
protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext mtx, UploadRequest ur) {

int chunkSize = uploadPartSize * (1024 * 1024);
long maxInFluxBytes = maxInFluxDataSize * (1024L * 1024L);

byte[] buffer = new byte[chunkSize];
int partCnt = 1;
int totalBytes = 0;
int inFluxBytes = 0;

ExecutorService executor = Executors.newFixedThreadPool(uploadThreadPoolSize);
Semaphore semaphore = new Semaphore(uploadThreadPoolSize);

try {
List<CompletableFuture<Void>> futures = new ArrayList<>();

int bytesRead;
while ((bytesRead = ur.getData().read(buffer)) != -1) {
semaphore.acquire();

while (bytesRead < buffer.length) {
int tempBytesRead = ur.getData().read(buffer, bytesRead, (chunkSize - bytesRead));
if (-1 == tempBytesRead) break;
bytesRead += tempBytesRead;
}

logger.debug(
"Creating upload task for part number {}, bytes read for this part {}", partCnt, bytesRead);
Expand All @@ -245,18 +243,19 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext
final int currentBytesRead = bytesRead;
byte[] taskBuffer = Arrays.copyOf(buffer, bytesRead);

if (inFluxBytes > maxInFluxBytes) {
inFluxBytes = easeDataPressure(futures);
}

futures.add(CompletableFuture.runAsync(
() -> mtx.partUploaded(
callAPIToUploadPart(mtx, ur, taskBuffer, currentBytesRead, currentPartCnt)),
() -> {
try {
mtx.partUploaded(
callAPIToUploadPart(mtx, ur, taskBuffer, currentBytesRead, currentPartCnt));
} finally {
semaphore.release();
}
},
executor));

partCnt++;
totalBytes += bytesRead;
inFluxBytes += bytesRead;
}

for (CompletableFuture<Void> future : futures) {
Expand All @@ -272,18 +271,6 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext
return mtx.transferred(chunkSize, totalBytes, partCnt);
}

private int easeDataPressure(List<CompletableFuture<Void>> futures)
throws InterruptedException, ExecutionException {

logger.debug("Reached max in-flux bytes - easing pressure");
for (CompletableFuture<Void> future : futures) {
future.get();
}
logger.debug("Max in-flux bytes handled - pressure eased");
futures.clear();
return 0;
}

protected UploadedPartContext callAPIToUploadPart(
MultipartTransferContext mtx, UploadRequest ur, byte[] part, int read, int partNo) {

Expand Down Expand Up @@ -389,7 +376,6 @@ public static class FusionAPIUploadOperationsBuilder {
int singlePartUploadSizeLimit;
int uploadPartSize;
int uploadThreadPoolSize;
long maxInFluxDataSize;

public FusionAPIUploadOperationsBuilder configuration(FusionConfiguration configuration) {
this.configuration = configuration;
Expand All @@ -413,12 +399,6 @@ private FusionAPIUploadOperationsBuilder uploadThreadPoolSize(int uploadThreadPo
this.uploadThreadPoolSize = uploadThreadPoolSize;
return this;
}

@SuppressWarnings("PIT")
private FusionAPIUploadOperationsBuilder maxInFluxDataSize(long maxInFluxDataSize) {
this.maxInFluxDataSize = maxInFluxDataSize;
return this;
}
}

private static class CustomFusionAPIUploadOperationsBuilder extends FusionAPIUploadOperationsBuilder {
Expand All @@ -427,7 +407,6 @@ public FusionAPIUploadOperations build() {
this.singlePartUploadSizeLimit = configuration.getSinglePartUploadSizeLimit();
this.uploadPartSize = configuration.getUploadPartSize();
this.uploadThreadPoolSize = configuration.getUploadThreadPoolSize();
this.maxInFluxDataSize = configuration.getMaxInFluxDataSize();

if (Objects.isNull(digestProducer)) {
this.digestProducer = AlgoSpecificDigestProducer.builder()
Expand Down

0 comments on commit 46ab2d0

Please sign in to comment.