Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base64 encode the checksum #639

Merged
merged 1 commit into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ abstract class Containers {

static {
LOCAL_STACK_CONTAINER = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:2.3.2")
).withServices(S3).withEnv("PROVIDER_OVERRIDE_S3", "v3");
DockerImageName.parse("localstack/localstack:4.2")
).withServices(S3);
LOCAL_STACK_CONTAINER.start();
System.setProperty(S3_SPI_ENDPOINT_PROTOCOL_PROPERTY, "http");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import software.amazon.nio.spi.s3.config.S3NioSpiConfiguration;

@DisplayName("Files$newByteChannel* should read and write on S3")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class FilesNewByteChannelTest {
Expand Down Expand Up @@ -76,34 +74,58 @@ public void newByteChannel_READ_WRITE() throws IOException {
assertThat(path).hasContent(text);
}

@Test
@DisplayName("newByteChannel with CRC32 integrity check")
public void newByteChannel_withIntegrityCheck_CRC32() throws Exception {
String text = "we test the integrity check when closing the byte channel";

withEnvironmentVariable("S3_INTEGRITY_CHECK_ALGORITHM", "CRC32").execute(() -> {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(text.getBytes()));
}

assertThat(path).hasContent(text);
});
}

@Test
@DisplayName("newByteChannel with CRC32C integrity check")
public void newByteChannel_withIntegrityCheck_CRC32C() throws Exception {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));

String text = "we test the integrity check when closing the byte channel";
withEnvironmentVariable(S3NioSpiConfiguration.S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "CRC32C").execute(() -> {

withEnvironmentVariable("S3_INTEGRITY_CHECK_ALGORITHM", "CRC32C").execute(() -> {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(text.getBytes()));
}
});

assertThat(path).hasContent(text);
assertThat(path).hasContent(text);
});
}

@Test
@DisplayName("newByteChannel with CRC64NVME integrity check")
public void newByteChannel_withIntegrityCheck_CRC64NVME() throws Exception {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));

String text = "we test the integrity check when closing the byte channel";
withEnvironmentVariable(S3NioSpiConfiguration.S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "CRC64NVME").execute(() -> {

withEnvironmentVariable("S3_INTEGRITY_CHECK_ALGORITHM", "CRC64NVME").execute(() -> {
var path = (S3Path) Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/bc-integrity-check.txt"));
try (var channel = Files.newByteChannel(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(text.getBytes()));
}

assertThat(path).hasContent(text);
});
}

assertThat(path).hasContent(text);
@Test
@DisplayName("newByteChannel with invalid integrity check")
public void newByteChannel_withIntegrityCheck_invalid() throws Exception {
withEnvironmentVariable("S3_INTEGRITY_CHECK_ALGORITHM", "invalid").execute(() -> {
var path = Paths.get(URI.create(localStackConnectionEndpoint() + "/" + bucketName + "/int-check-algo-test.txt"));
assertThatThrownBy(() -> Files.newByteChannel(path)).hasMessage("unknown integrity check algorithm 'invalid'");
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import software.amazon.awssdk.crt.checksums.CRC32;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.BinaryUtils;

class Crc32FileIntegrityCheck implements S3ObjectIntegrityCheck {
private final byte[] buffer = new byte[16 * 1024];
private final CRC32 checksum = new CRC32();
private final ByteBuffer checksumBuffer = ByteBuffer.allocate(Integer.BYTES);

@Override
public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
checksum.reset();
checksumBuffer.clear();
try (var in = Files.newInputStream(file)) {
int len;
while ((len = in.read(buffer)) != -1) {
checksum.update(buffer, 0, len);
}
checksumBuffer.putInt((int) checksum.getValue());
builder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
builder.checksumCRC32(BinaryUtils.toBase64(checksumBuffer.array()));
} catch (IOException cause) {
throw new UncheckedIOException(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import software.amazon.awssdk.crt.checksums.CRC32C;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.internal.Base16;
import software.amazon.awssdk.utils.BinaryUtils;

class Crc32cFileIntegrityCheck implements S3ObjectIntegrityCheck {
private final byte[] buffer = new byte[16 * 1024];
Expand All @@ -31,7 +31,7 @@ public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
}
checksumBuffer.putInt((int) checksum.getValue());
builder.checksumAlgorithm(ChecksumAlgorithm.CRC32_C);
builder.checksumCRC32C(Base16.encodeAsString(checksumBuffer.array()));
builder.checksumCRC32C(BinaryUtils.toBase64(checksumBuffer.array()));
} catch (IOException cause) {
throw new UncheckedIOException(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import software.amazon.awssdk.crt.checksums.CRC64NVME;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.internal.Base16;
import software.amazon.awssdk.utils.BinaryUtils;

class Crc64nvmeFileIntegrityCheck implements S3ObjectIntegrityCheck {
private final byte[] buffer = new byte[16 * 1024];
Expand All @@ -31,7 +31,7 @@ public void addChecksumToRequest(Path file, PutObjectRequest.Builder builder) {
}
checksumBuffer.putLong(checksum.getValue());
builder.checksumAlgorithm(ChecksumAlgorithm.CRC64_NVME);
builder.checksumCRC64NVME(Base16.encodeAsString(checksumBuffer.array()));
builder.checksumCRC64NVME(BinaryUtils.toBase64(checksumBuffer.array()));
} catch (IOException cause) {
throw new UncheckedIOException(cause);
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/software/amazon/nio/spi/s3/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public void close() throws IOException {
*/
S3ObjectIntegrityCheck integrityCheck() {
var algorithm = configuration.getIntegrityCheckAlgorithm();
if (algorithm.equalsIgnoreCase("CRC32")) {
return new Crc32FileIntegrityCheck();
}
if (algorithm.equalsIgnoreCase("CRC32C")) {
return new Crc32cFileIntegrityCheck();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,9 @@ boolean exists(S3AsyncClient s3Client, S3Path path) throws InterruptedException,
s3Client.headObject(HeadObjectRequest.builder().bucket(path.bucketName()).key(path.getKey()).build())
.get(configuration.getTimeoutLow(), MINUTES);
return true;
} catch (ExecutionException | NoSuchKeyException e) {
} catch (NoSuchKeyException e) {
return false;
} catch (ExecutionException e) {
logger.debug("Could not retrieve object head information", e);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,6 +107,11 @@ public class S3NioSpiConfiguration extends HashMap<String, Object> {
* The default value of the S3 object integrity check property
*/
public static final String S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT = "disabled";
/**
* Allowed algorithms of the S3 object integrity check property
*/
public static final Set<String> S3_INTEGRITY_CHECK_ALGORITHM_ALLOWED = Set.of(
"CRC32", "CRC32C", "CRC64NVME", S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT.toUpperCase());

private static final Pattern ENDPOINT_REGEXP = Pattern.compile("(\\w[\\w\\-\\.]*)?(:(\\d+))?");

Expand Down Expand Up @@ -403,6 +409,7 @@ public S3NioSpiConfiguration withIntegrityCheckAlgorithm(String algorithm) {
} else {
put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, algorithm);
}
validateIntegrityAlgorithm(algorithm);

return this;
}
Expand Down Expand Up @@ -542,7 +549,15 @@ public Long getTimeoutHigh() {
* @return the configured value or the default if not overridden
*/
public String getIntegrityCheckAlgorithm() {
return (String) getOrDefault(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT);
String algorithm = (String) getOrDefault(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, S3_INTEGRITY_CHECK_ALGORITHM_DEFAULT);
validateIntegrityAlgorithm(algorithm);
return algorithm;
}

private void validateIntegrityAlgorithm(String algorithm) {
if (!S3_INTEGRITY_CHECK_ALGORITHM_ALLOWED.contains(algorithm.toUpperCase())) {
throw new UnsupportedOperationException("unknown integrity check algorithm '" + algorithm + "'");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.nio.spi.s3;

import static java.nio.file.StandardOpenOption.*;
import static org.assertj.core.api.Assertions.*;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import software.amazon.awssdk.services.s3.model.PutObjectRequest;

class Crc32FileIntegrityCheckTest {

@Test
void test(@TempDir Path tempDir) throws IOException {
var integrityCheck = new Crc32FileIntegrityCheck();
var file = tempDir.resolve("test");
Files.writeString(file, "hello world!", CREATE_NEW);
var putObjectRequest = PutObjectRequest.builder();
integrityCheck.addChecksumToRequest(file, putObjectRequest);
assertThat(putObjectRequest.build().checksumCRC32()).isEqualTo("A7TCbQ==");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void test(@TempDir Path tempDir) throws IOException {
Files.writeString(file, "hello world!", CREATE_NEW);
var putObjectRequest = PutObjectRequest.builder();
integrityCheck.addChecksumToRequest(file, putObjectRequest);
assertThat(putObjectRequest.build().checksumCRC32C()).isEqualTo("49CB5777");
assertThat(putObjectRequest.build().checksumCRC32C()).isEqualTo("SctXdw==");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void test(@TempDir Path tempDir) throws IOException {
Files.writeString(file, "hello world!", CREATE_NEW);
var putObjectRequest = PutObjectRequest.builder();
integrityCheck.addChecksumToRequest(file, putObjectRequest);
assertThat(putObjectRequest.build().checksumCRC64NVME()).isEqualTo("D9160D1FA8E418E3");
assertThat(putObjectRequest.build().checksumCRC64NVME()).isEqualTo("2RYNH6jkGOM=");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.assertj.core.api.BDDAssertions.entry;

import static org.assertj.core.api.BDDAssertions.then;
import static org.assertj.core.api.BDDAssertions.thenExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -290,16 +291,24 @@ public void withAndGetIntegrityCheckAlgorithm() throws Exception {
then(config.withIntegrityCheckAlgorithm("CRC64NVME").getIntegrityCheckAlgorithm()).isEqualTo("CRC64NVME");

var map = new HashMap<String, String>();
map.put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "1212");
map.put(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "invalid");
var c = new S3NioSpiConfiguration(map);

then(c.getIntegrityCheckAlgorithm()).isEqualTo("1212");
thenExceptionOfType(UnsupportedOperationException.class)
.isThrownBy(() -> c.getIntegrityCheckAlgorithm())
.withMessage("unknown integrity check algorithm 'invalid'");

thenExceptionOfType(UnsupportedOperationException.class)
.isThrownBy(() -> c.withIntegrityCheckAlgorithm("unknown algorithm"))
.withMessage("unknown integrity check algorithm 'unknown algorithm'");

withEnvironmentVariable("S3_INTEGRITY_CHECK_ALGORITHM", "CRC32C")
.execute(() -> then(new S3NioSpiConfiguration().getIntegrityCheckAlgorithm()).isEqualTo("CRC32C"));

withEnvironmentVariable("S3_INTEGRITY_CHECK_ALGORITHM", "CRC64NVME")
.execute(() -> then(new S3NioSpiConfiguration().getIntegrityCheckAlgorithm()).isEqualTo("CRC64NVME"));

then(new S3NioSpiConfiguration(Map.of(S3_INTEGRITY_CHECK_ALGORITHM_PROPERTY, "CRC32")).getIntegrityCheckAlgorithm()).isEqualTo("CRC32");
}

}