Skip to content

Commit

Permalink
Merge pull request #54 from jpmorganchase/release
Browse files Browse the repository at this point in the history
Completion of 0.0.8 release
  • Loading branch information
knighto82 authored Oct 19, 2023
2 parents 988745a + a25f3af commit 0ae04ef
Show file tree
Hide file tree
Showing 41 changed files with 1,601 additions and 1,140 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The Fusion SDK is published to Maven Central and can be retrieved from there usi
<dependency>
<groupId>io.github.jpmorganchase.fusion</groupId>
<artifactId>fusion-sdk</artifactId>
<version>0.0.7</version>
<version>0.0.8</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<groupId>io.github.jpmorganchase.fusion</groupId>
<artifactId>fusion-sdk</artifactId>
<packaging>jar</packaging>
<version>0.0.7</version>
<version>0.0.8</version>

<name>fusion-sdk</name>
<description>A Java SDK for the Fusion platform API</description>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.github.jpmorganchase.fusion.packaging;

import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.io.FileReader;
import java.nio.file.Paths;
import java.util.jar.Attributes;
import java.util.jar.JarFile;
import java.util.jar.Manifest;

import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

/**
* This test will run under failsafe after the JAR has been built for the project
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/io/github/jpmorganchase/fusion/Fusion.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package io.github.jpmorganchase.fusion;

import io.github.jpmorganchase.fusion.api.*;
import io.github.jpmorganchase.fusion.api.APIManager;
import io.github.jpmorganchase.fusion.api.FusionAPIManager;
import io.github.jpmorganchase.fusion.api.exception.APICallException;
import io.github.jpmorganchase.fusion.api.exception.ApiInputValidationException;
import io.github.jpmorganchase.fusion.api.exception.FileDownloadException;
import io.github.jpmorganchase.fusion.api.exception.FileUploadException;
import io.github.jpmorganchase.fusion.http.Client;
import io.github.jpmorganchase.fusion.http.JdkClient;
import io.github.jpmorganchase.fusion.model.*;
import io.github.jpmorganchase.fusion.oauth.credential.*;
import io.github.jpmorganchase.fusion.oauth.credential.BearerTokenCredentials;
import io.github.jpmorganchase.fusion.oauth.credential.Credentials;
import io.github.jpmorganchase.fusion.oauth.credential.OAuthPasswordBasedCredentials;
import io.github.jpmorganchase.fusion.oauth.credential.OAuthSecretBasedCredentials;
import io.github.jpmorganchase.fusion.oauth.exception.OAuthException;
import io.github.jpmorganchase.fusion.oauth.provider.*;
import io.github.jpmorganchase.fusion.oauth.provider.DefaultFusionTokenProvider;
import io.github.jpmorganchase.fusion.oauth.provider.FusionTokenProvider;
import io.github.jpmorganchase.fusion.parsing.APIResponseParser;
import io.github.jpmorganchase.fusion.parsing.GsonAPIResponseParser;
import io.github.jpmorganchase.fusion.parsing.ParsingException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import io.github.jpmorganchase.fusion.api.response.UploadedParts;
import io.github.jpmorganchase.fusion.model.Operation;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import lombok.*;

@Getter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
package io.github.jpmorganchase.fusion.api.operations;

import static io.github.jpmorganchase.fusion.api.tools.ResponseChecker.checkResponseStatus;

import io.github.jpmorganchase.fusion.FusionConfiguration;
import io.github.jpmorganchase.fusion.FusionException;
import io.github.jpmorganchase.fusion.api.exception.APICallException;
import io.github.jpmorganchase.fusion.api.exception.FileDownloadException;
import io.github.jpmorganchase.fusion.api.request.DownloadRequest;
import io.github.jpmorganchase.fusion.api.request.*;
import io.github.jpmorganchase.fusion.api.response.GetPartResponse;
import io.github.jpmorganchase.fusion.api.response.Head;
import io.github.jpmorganchase.fusion.api.stream.IntegrityCheckingInputStream;
import io.github.jpmorganchase.fusion.api.stream.DeferredMultiPartInputStream;
import io.github.jpmorganchase.fusion.http.Client;
import io.github.jpmorganchase.fusion.http.HttpResponse;
import io.github.jpmorganchase.fusion.oauth.exception.OAuthException;
import io.github.jpmorganchase.fusion.oauth.provider.FusionTokenProvider;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -34,13 +30,9 @@ public class FusionAPIDownloadOperations implements APIDownloadOperations {
private static final String DOWNLOAD_FAILED_EXCEPTION_MSG =
"Problem encountered attempting to download distribution";

private static final String HEAD_PATH = "%s/operationType/download";
private static final String HEAD_PATH_FOR_PART = "%s?downloadPartNumber=%d";

private static final String FILE_RW_MODE = "rw";

private final Client httpClient;
private final FusionTokenProvider fusionTokenProvider;
private PartFetcher partFetcher;

/**
* Size of Thread-Pool to be used for uploading chunks of a multipart file
Expand Down Expand Up @@ -112,6 +104,7 @@ protected void downloadToFile(DownloadRequest dr) {
protected void performMultiPartDownloadToFile(DownloadRequest dr, Head head) {

ExecutorService executor = getExecutor();

try (RandomAccessFile raf = new RandomAccessFile(dr.getFilePath(), FILE_RW_MODE)) {

raf.setLength(head.getContentLength());
Expand All @@ -121,7 +114,10 @@ protected void performMultiPartDownloadToFile(DownloadRequest dr, Head head) {
final int part = p;
futures.add(CompletableFuture.runAsync(
() -> {
GetPartResponse getPartResponse = callToAPIToGetPart(dr, part);
GetPartResponse getPartResponse = partFetcher.fetch(PartRequest.builder()
.partNo(part)
.downloadRequest(dr)
.build());
writePartToFile(getPartResponse, raf);
},
executor));
Expand All @@ -135,13 +131,13 @@ protected void performMultiPartDownloadToFile(DownloadRequest dr, Head head) {
} finally {
executor.shutdown();
}
log.info("Distribution downloaded to file {}", dr.getFilePath());
}

private void writePartToFile(GetPartResponse gpr, RandomAccessFile raf) {

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (InputStream input =
IntegrityCheckingInputStream.builder().part(gpr).build()) {
try (InputStream input = gpr.getContent()) {

byte[] buffer = new byte[8192];
int bytesRead;
Expand All @@ -159,25 +155,7 @@ private void writePartToFile(GetPartResponse gpr, RandomAccessFile raf) {
}
}

protected GetPartResponse callToAPIToGetPart(DownloadRequest dr, int partNumber) {

String getPartPath = getPathForHeadAndGet(dr, partNumber);

Map<String, String> requestHeaders = new HashMap<>();
setSecurityHeaders(dr, requestHeaders);

HttpResponse<InputStream> response = httpClient.getInputStream(getPartPath, requestHeaders);

checkResponseStatus(response);

return GetPartResponse.builder()
.content(response.getBody())
.head(Head.builder().fromHeaders(response.getHeaders()).build())
.build();
}

public void performSinglePartDownloadToFile(DownloadRequest dr, Head head) throws APICallException {

try (InputStream input = performSinglePartDownloadToStream(dr, head)) {
try (FileOutputStream fileOutput = new FileOutputStream(dr.getFilePath())) {
byte[] buf = new byte[8192];
Expand All @@ -189,6 +167,7 @@ public void performSinglePartDownloadToFile(DownloadRequest dr, Head head) throw
} catch (IOException e) {
throw new FileDownloadException(WRITE_TO_FILE_EXCEPTION_MSG, e);
}
log.info("Distribution downloaded to file {}", dr.getFilePath());
}

protected InputStream downloadToStream(DownloadRequest dr) {
Expand All @@ -200,75 +179,42 @@ protected InputStream downloadToStream(DownloadRequest dr) {
}
}

protected InputStream performMultiPartDownloadToStream(DownloadRequest dr, Head head) {
private Head callAPIToGetHead(DownloadRequest dr) {
return partFetcher
.fetch(PartRequest.builder().partNo(0).downloadRequest(dr).build())
.getHead();
}

ExecutorService executor = getExecutor();
List<CompletableFuture<GetPartResponse>> futures = new ArrayList<>();
protected InputStream performMultiPartDownloadToStream(DownloadRequest dr, Head head) {

LinkedList<CallablePart> parts = new LinkedList<>();
try {

for (int p = 1; p <= head.getPartCount(); p++) {
final int part = p;
futures.add(CompletableFuture.supplyAsync(() -> callToAPIToGetPart(dr, part), executor));
parts.add(CallablePart.builder()
.partNo(p)
.partFetcher(partFetcher)
.downloadRequest(dr)
.build());
}

List<GetPartResponse> parts = futures.stream()
.map(CompletableFuture::join)
.sorted(Comparator.comparingInt(gpr -> gpr.getHead().getPartCount()))
.collect(Collectors.toList());

return IntegrityCheckingInputStream.builder().parts(parts).build();
return DeferredMultiPartInputStream.builder()
.parts(CallableParts.builder().parts(parts).build())
.build();

} catch (CompletionException | CancellationException | IOException e) {
} catch (IOException e) {
throw handleExceptionThrownWhenAttemptingToGetParts(e);
} finally {
executor.shutdown();
}
}

public InputStream performSinglePartDownloadToStream(DownloadRequest dr, Head head) throws APICallException {
Map<String, String> requestHeaders = new HashMap<>();
setSecurityHeaders(dr, requestHeaders);

HttpResponse<InputStream> response = httpClient.getInputStream(dr.getApiPath(), requestHeaders);

checkResponseStatus(response);
try {
return IntegrityCheckingInputStream.builder()
.part(GetPartResponse.builder()
.content(response.getBody())
.head(head)
.build())
.build();
} catch (IOException ex) {
throw handleExceptionThrownWhenAttemptingToGetParts(ex);
}
}

/**
* Returns the Head object representing the entire file
*
* @param dr {@link DownloadRequest}
* @return {@link Head} object describing the file
*/
protected Head callAPIToGetHead(DownloadRequest dr) {
String headPath = getPathForHeadAndGet(dr, 0);

Map<String, String> requestHeaders = new HashMap<>();
setSecurityHeaders(dr, requestHeaders);

HttpResponse<InputStream> headResponse = httpClient.getInputStream(headPath, requestHeaders);
checkResponseStatus(headResponse);

return Head.builder().fromHeaders(headResponse.getHeaders()).build();
}

private String getPathForHeadAndGet(DownloadRequest dr, int partNumber) {
String headPath = String.format(HEAD_PATH, dr.getApiPath());
if (partNumber > 0) {
headPath = String.format(HEAD_PATH_FOR_PART, headPath, partNumber);
}
return headPath;
return partFetcher
.fetch(PartRequest.builder()
.partNo(1)
.head(head)
.downloadRequest(dr)
.build())
.getContent();
}

private FusionException handleExceptionThrownWhenAttemptingToGetParts(Exception ex) {
Expand All @@ -284,13 +230,6 @@ private ExecutorService getExecutor() {
return Executors.newFixedThreadPool(downloadThreadPoolSize);
}

private void setSecurityHeaders(DownloadRequest dr, Map<String, String> requestHeaders) {
requestHeaders.put("Authorization", "Bearer " + fusionTokenProvider.getSessionBearerToken());
requestHeaders.put(
"Fusion-Authorization",
"Bearer " + fusionTokenProvider.getDatasetBearerToken(dr.getCatalog(), dr.getDataset()));
}

public static FusionAPIDownloadOperationsBuilder builder() {
return new CustomFusionAPIDownloadOperationsBuilder();
}
Expand All @@ -301,23 +240,50 @@ public static class FusionAPIDownloadOperationsBuilder {
FusionConfiguration.builder().build();

int downloadThreadPoolSize;
Client httpClient;

FusionTokenProvider fusionTokenProvider;

PartFetcher partFetcher;

public FusionAPIDownloadOperationsBuilder configuration(FusionConfiguration configuration) {
this.configuration = configuration;
return this;
}

public FusionAPIDownloadOperationsBuilder httpClient(Client httpClient) {
this.httpClient = httpClient;
return this;
}

public FusionAPIDownloadOperationsBuilder fusionTokenProvider(FusionTokenProvider fusionTokenProvider) {
this.fusionTokenProvider = fusionTokenProvider;
return this;
}

@SuppressWarnings("PIT")
private FusionAPIDownloadOperationsBuilder downloadThreadPoolSize(int downloadThreadPoolSize) {
this.downloadThreadPoolSize = downloadThreadPoolSize;
return this;
}

public FusionAPIDownloadOperationsBuilder partFetcher(PartFetcher partFetcher) {
this.partFetcher = partFetcher;
return this;
}
}

private static class CustomFusionAPIDownloadOperationsBuilder extends FusionAPIDownloadOperationsBuilder {
@Override
public FusionAPIDownloadOperations build() {
this.downloadThreadPoolSize = configuration.getDownloadThreadPoolSize();

if (Objects.isNull(partFetcher))
this.partFetcher = PartFetcher.builder()
.client(httpClient)
.credentials(fusionTokenProvider)
.build();

return super.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.jpmorganchase.fusion.api.operations;

import static io.github.jpmorganchase.fusion.api.tools.ResponseChecker.*;
import static io.github.jpmorganchase.fusion.api.tools.ResponseChecker.checkResponseStatus;

import com.google.gson.GsonBuilder;
import io.github.jpmorganchase.fusion.FusionConfiguration;
Expand All @@ -26,7 +26,10 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Builder;
import lombok.Getter;

Expand Down
Loading

0 comments on commit 0ae04ef

Please sign in to comment.