Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Store the resources in S3 buckets #611

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bundles/sirix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
api implLibraries.iouring
api implLibraries.lz4
api implLibraries.roaringbitmap
api implLibraries.amazonS3

implementation implLibraries.snappyJava
implementation implLibraries.browniesCollections
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.sirix.io.cloud;

public enum CloudPlatform {

AWS, GCP, AZURE

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.sirix.io.cloud;

import org.sirix.io.IOStorage;

public interface ICloudStorage extends IOStorage {



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package org.sirix.io.cloud.amazon;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;

import org.sirix.access.ResourceConfiguration;
import org.sirix.io.Reader;
import org.sirix.io.RevisionFileData;
import org.sirix.io.Writer;
import org.sirix.io.bytepipe.ByteHandler;
import org.sirix.io.bytepipe.ByteHandlerPipeline;
import org.sirix.io.cloud.ICloudStorage;
import org.sirix.page.PagePersister;
import org.sirix.page.SerializationType;
import org.sirix.utils.LogWrapper;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.AsyncCache;

import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.waiters.S3Waiter;

/**
* Factory to provide Amazon S3 as storage backend
*
* @Auther Sanket Band (@sband)
**/

public final class AmazonS3Storage implements ICloudStorage {

/**
* Data file name.
*/
private static final String FILENAME = "sirix.data";

/**
* Revisions file name.
*/
private static final String REVISIONS_FILENAME = "sirix.revisions";

/**
* Instance to local storage.
*/
private final Path file;

/**
* S3 storage bucket name
*
*/
private String bucketName;

private S3Client s3Client;

/** Logger. */
private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3Storage.class));

/**
* Byte handler pipeline.
*/
private final ByteHandlerPipeline byteHandlerPipeline;

/**
* Revision file data cache.
*/
private final AsyncCache<Integer, RevisionFileData> cache;

private String awsProfile;
private String region;

private final AmazonS3StorageReader reader;

/**
* Support AWS authentication only with .aws credentials file with the required
* profile name from the creds file
*/
public AmazonS3Storage(String bucketName, String awsProfile,
String region,
boolean shouldCreateBucketIfNotExists, final ResourceConfiguration resourceConfig,
AsyncCache<Integer, RevisionFileData> cache) {
this.bucketName = bucketName;
this.awsProfile = awsProfile;
this.region = region;
this.cache = cache;
this.byteHandlerPipeline = resourceConfig.byteHandlePipeline;
this.file = resourceConfig.resourcePath;
this.s3Client = getS3Client(); //this client is needed for the below checks, so initialize it here only.
if(!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) {
createBucket(bucketName);
}
this.reader = new AmazonS3StorageReader(bucketName,
s3Client,
getDataFilePath().toAbsolutePath().toString(),
getRevisionFilePath().toAbsolutePath().toString(),
new ByteHandlerPipeline(this.byteHandlerPipeline),
SerializationType.DATA,
new PagePersister(),
cache.synchronous());
}

void createBucket(String bucketName) {
try {
S3Waiter s3Waiter = s3Client.waiter();
CreateBucketRequest bucketRequest = CreateBucketRequest.builder().bucket(bucketName).build();

s3Client.createBucket(bucketRequest);
HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder().bucket(bucketName).build();

WaiterResponse<HeadBucketResponse> waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait);
if (waiterResponse.matched().response().isPresent()) {
LOGGER.info(String.format("S3 bucket: %s has been created.", bucketName));
}
} catch (S3Exception e) {
LOGGER.error(e.awsErrorDetails().errorMessage());
LOGGER.error(String.format("Bucket: %s could not be created. Will not consume S3 storage", bucketName));
System.exit(1);
}
}

boolean isBucketExists(String bucketName) {
HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build();

try {
s3Client.headBucket(headBucketRequest);
return true;
} catch (NoSuchBucketException e) {
return false;
}
}

S3Client getS3Client() {
return this.s3Client==null ? S3Client.builder()
.region(Region.of(region))
.credentialsProvider(ProfileCredentialsProvider.create(awsProfile))
.build() : this.s3Client;
}

S3AsyncClient getAsyncS3Client() {
return S3AsyncClient.builder()
.region(Region.of(region))
.credentialsProvider(ProfileCredentialsProvider.create(awsProfile))
.build();
}

@Override
public Writer createWriter() {
AmazonS3StorageReader reader = new AmazonS3StorageReader(bucketName,
s3Client,
getDataFilePath().toAbsolutePath().toString(),
getRevisionFilePath().toAbsolutePath().toString(),
new ByteHandlerPipeline(byteHandlerPipeline),
SerializationType.DATA,
new PagePersister(),
cache.synchronous());
return new AmazonS3StorageWriter (getDataFilePath().toAbsolutePath().toString(),
getRevisionFilePath().toAbsolutePath().toString(),
bucketName,
SerializationType.DATA,new PagePersister(),
cache,reader,
this.getAsyncS3Client());
}

@Override
public Reader createReader() {
return this.reader;
}

@Override
public void close() {

}

@Override
public boolean exists() {
Path storage = this.reader.readObjectDataFromS3(getDataFilePath().toAbsolutePath().toString());
try {
return Files.exists(storage) && Files.size(storage) > 0;
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public ByteHandler getByteHandler() {
return this.byteHandlerPipeline;
}

/**
* Getting path for data file.
* This path would be used on the local storage
* @return the path for this data file
*/
private Path getDataFilePath() {
return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(FILENAME);
}

/**
* Getting concrete storage for this file.
*
* @return the concrete storage for this database
*/
private Path getRevisionFilePath() {
return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(REVISIONS_FILENAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package org.sirix.io.cloud.amazon;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.time.Instant;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.sirix.api.PageReadOnlyTrx;
import org.sirix.io.Reader;
import org.sirix.io.RevisionFileData;
import org.sirix.io.bytepipe.ByteHandler;
import org.sirix.io.file.FileReader;
import org.sirix.page.PagePersister;
import org.sirix.page.PageReference;
import org.sirix.page.RevisionRootPage;
import org.sirix.page.SerializationType;
import org.sirix.page.interfaces.Page;
import org.sirix.utils.LogWrapper;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.hash.HashFunction;

import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;

public class AmazonS3StorageReader implements Reader {

/**
* S3 storage bucket name
*
*/
private String bucketName;

private S3Client s3Client;

/** Logger. */
private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageReader.class));


private FileReader reader;

public AmazonS3StorageReader(String bucketName,
S3Client s3Client,
String dataFileKeyName,
String revisionsOffsetFileKeyName,
final ByteHandler byteHandler,
final SerializationType serializationType,
final PagePersister pagePersister,
final Cache<Integer, RevisionFileData> cache) {
this.bucketName = bucketName;
this.s3Client = s3Client;
Path dataFilePath = readObjectDataFromS3(dataFileKeyName);
Path revisionOffsetFilePath = readObjectDataFromS3(revisionsOffsetFileKeyName);
try {
this.reader = new FileReader(new RandomAccessFile(dataFilePath.toFile(), "r"),
new RandomAccessFile(revisionOffsetFilePath.toFile(), "r"),
byteHandler,
serializationType,
pagePersister,
cache);
}catch(IOException io) {
LOGGER.error(io.getMessage());
System.exit(1);
}

}

/**
* @param keyName - Key name of the object to be read from S3 storage
* @return path - The location of the local file that contains the data that is written to the file system storage
* in the system temp directory.
*/
protected Path readObjectDataFromS3(String keyName) {

try {
GetObjectRequest objectRequest = GetObjectRequest
.builder()
.key(keyName)
.bucket(bucketName)
.build();

ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(objectRequest);
byte[] data = objectBytes.asByteArray();
/*As the bucketName has to be same as the database name, it makes sense to use/create file on the local filesystem
* instead of in the tmp partition*/
String path = FileSystems.getDefault().getSeparator() + bucketName + FileSystems.getDefault().getSeparator() + keyName;
sband marked this conversation as resolved.
Show resolved Hide resolved
// Write the data to a local file.
File myFile = new File(path);
try(OutputStream os = new FileOutputStream(myFile)){
os.write(data);
}
return Path.of(path);
} catch (IOException ex) {
ex.printStackTrace();
} catch (S3Exception e) {
LOGGER.error(e.awsErrorDetails().errorMessage());
System.exit(1);
}
return null;
}

ByteHandler getByteHandler() {
return this.reader.getByteHandler();
}

HashFunction getHashFunction() {
return this.reader.getHashFunction();
}

@Override
public PageReference readUberPageReference() {
return reader.readUberPageReference();
}

@Override
public Page read(PageReference key, @Nullable PageReadOnlyTrx pageReadTrx) {
return reader.read(key, pageReadTrx);
}

@Override
public void close() {
s3Client.close();
reader.close();
}

@Override
public RevisionRootPage readRevisionRootPage(int revision, PageReadOnlyTrx pageReadTrx) {
return reader.readRevisionRootPage(revision, pageReadTrx);
}

@Override
public Instant readRevisionRootPageCommitTimestamp(int revision) {
return reader.readRevisionRootPageCommitTimestamp(revision);
}

@Override
public RevisionFileData getRevisionFileData(int revision) {
return reader.getRevisionFileData(revision);
}

}
Loading