diff --git a/NOTICES b/NOTICES index 6c073a52d6..e9714e2a57 100644 --- a/NOTICES +++ b/NOTICES @@ -37,7 +37,7 @@ Apache Ivy (org.apache.ivy:ivy:2.5.2): - Apache License, Version 2.0 -Snappy Java (org.xerial.snappy:snappy-java:1.1.10.1): +Snappy Java (org.xerial.snappy:snappy-java:1.1.10.4): - Apache License, Version 2.0 diff --git a/java/athena/src/main/java/sleeper/athena/metadata/SleeperMetadataHandler.java b/java/athena/src/main/java/sleeper/athena/metadata/SleeperMetadataHandler.java index d9d34f55b3..964d60ae69 100644 --- a/java/athena/src/main/java/sleeper/athena/metadata/SleeperMetadataHandler.java +++ b/java/athena/src/main/java/sleeper/athena/metadata/SleeperMetadataHandler.java @@ -47,6 +47,7 @@ import com.google.gson.Gson; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +106,7 @@ public SleeperMetadataHandler(AmazonS3 s3Client, AmazonDynamoDB dynamoDBClient, this.instanceProperties = new InstanceProperties(); this.instanceProperties.loadFromS3(s3Client, configBucket); this.tablePropertiesProvider = new TablePropertiesProvider(s3Client, instanceProperties); - this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties); + this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, new Configuration()); } public SleeperMetadataHandler(AmazonS3 s3Client, @@ -121,7 +122,7 @@ public SleeperMetadataHandler(AmazonS3 s3Client, this.instanceProperties = new InstanceProperties(); this.instanceProperties.loadFromS3(s3Client, configBucket); this.tablePropertiesProvider = new TablePropertiesProvider(s3Client, instanceProperties); - this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties); + this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, new Configuration()); } /** diff --git a/java/athena/src/test/java/sleeper/athena/TestUtils.java b/java/athena/src/test/java/sleeper/athena/TestUtils.java index 1a04982a48..061394d860 100644 --- a/java/athena/src/test/java/sleeper/athena/TestUtils.java +++ b/java/athena/src/test/java/sleeper/athena/TestUtils.java @@ -99,7 +99,7 @@ public static void ingestData(AmazonDynamoDB dynamoClient, AmazonS3 s3Client, St IngestFactory factory = IngestFactory.builder() .objectFactory(ObjectFactory.noUserJars()) .localDir(dataDir) - .stateStoreProvider(new StateStoreProvider(dynamoClient, instanceProperties)) + .stateStoreProvider(new StateStoreProvider(dynamoClient, instanceProperties, new Configuration())) .hadoopConfiguration(new Configuration()) .instanceProperties(instanceProperties) .build(); diff --git a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/BulkImportJobDriver.java b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/BulkImportJobDriver.java index e35e3b543b..8d06b5cd70 100644 --- a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/BulkImportJobDriver.java +++ b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/BulkImportJobDriver.java @@ -24,6 +24,7 @@ import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest; import com.amazonaws.services.securitytoken.model.GetCallerIdentityResult; import com.google.gson.JsonSyntaxException; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,7 @@ import sleeper.ingest.job.status.IngestJobStatusStore; import sleeper.ingest.status.store.job.IngestJobStatusStoreFactory; import sleeper.statestore.StateStoreProvider; +import sleeper.utils.HadoopConfigurationProvider; import java.io.IOException; import java.nio.file.Files; @@ -78,17 +80,17 @@ public BulkImportJobDriver(SessionRunner sessionRunner, } public static BulkImportJobDriver from(BulkImportJobRunner jobRunner, InstanceProperties instanceProperties, - AmazonS3 s3Client, AmazonDynamoDB dynamoClient) { - return from(jobRunner, instanceProperties, s3Client, dynamoClient, + AmazonS3 s3Client, AmazonDynamoDB dynamoClient, Configuration conf) { + return from(jobRunner, instanceProperties, s3Client, dynamoClient, conf, IngestJobStatusStoreFactory.getStatusStore(dynamoClient, instanceProperties), Instant::now); } public static BulkImportJobDriver from(BulkImportJobRunner jobRunner, InstanceProperties instanceProperties, - AmazonS3 s3Client, AmazonDynamoDB dynamoClient, + AmazonS3 s3Client, AmazonDynamoDB dynamoClient, Configuration conf, IngestJobStatusStore statusStore, Supplier getTime) { TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3Client, instanceProperties); - StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoClient, instanceProperties); + StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoClient, instanceProperties, conf); return new BulkImportJobDriver(new BulkImportSparkSessionRunner( jobRunner, instanceProperties, tablePropertiesProvider, stateStoreProvider), tablePropertiesProvider, stateStoreProvider, statusStore, getTime); @@ -195,7 +197,8 @@ public static void start(String[] args, BulkImportJobRunner runner) throws Excep } BulkImportJobDriver driver = BulkImportJobDriver.from(runner, instanceProperties, - amazonS3, AmazonDynamoDBClientBuilder.defaultClient()); + amazonS3, AmazonDynamoDBClientBuilder.defaultClient(), + HadoopConfigurationProvider.getConfigurationForEMR(instanceProperties)); driver.run(bulkImportJob, jobRunId, taskId); } } diff --git a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/BulkImportSparkSessionRunner.java b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/BulkImportSparkSessionRunner.java index 2a856c5461..65de70ba72 100644 --- a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/BulkImportSparkSessionRunner.java +++ b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/BulkImportSparkSessionRunner.java @@ -93,6 +93,7 @@ public BulkImportJobOutput run(BulkImportJob job) throws IOException { try { allPartitions = stateStore.getAllPartitions(); } catch (StateStoreException e) { + LOGGER.error("Could not load partitions", e); throw new RuntimeException("Failed to load statestore. Are permissions correct for this service account?"); } diff --git a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/SparkFileInfoRow.java b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/SparkFileInfoRow.java index 2bd76900da..c157ded780 100644 --- a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/SparkFileInfoRow.java +++ b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/job/runner/SparkFileInfoRow.java @@ -21,6 +21,8 @@ import sleeper.core.statestore.FileInfo; +import java.time.Instant; + public class SparkFileInfoRow { private SparkFileInfoRow() { @@ -34,6 +36,7 @@ public static FileInfo createFileInfo(Row row) { return FileInfo.builder() .filename(row.getAs(FILENAME_FIELD_NAME)) .jobId(null) + .lastStateStoreUpdateTime(Instant.now()) .fileStatus(FileInfo.FileStatus.ACTIVE) .partitionId(row.getAs(PARTITION_FIELD_NAME)) .numberOfRecords(row.getAs(NUM_RECORDS_FIELD_NAME)) diff --git a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/job/runner/BulkImportJobDriverIT.java b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/job/runner/BulkImportJobDriverIT.java index e36c349663..1889cbe7d1 100644 --- a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/job/runner/BulkImportJobDriverIT.java +++ b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/job/runner/BulkImportJobDriverIT.java @@ -20,6 +20,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetWriter; import org.junit.jupiter.api.AfterAll; @@ -59,8 +60,10 @@ import sleeper.ingest.job.status.WriteToMemoryIngestJobStatusStore; import sleeper.io.parquet.record.ParquetRecordReader; import sleeper.io.parquet.record.ParquetRecordWriterFactory; -import sleeper.statestore.StateStoreProvider; +import sleeper.statestore.dynamodb.DynamoDBStateStore; import sleeper.statestore.dynamodb.DynamoDBStateStoreCreator; +import sleeper.statestore.s3.S3StateStore; +import sleeper.statestore.s3.S3StateStoreCreator; import java.io.BufferedWriter; import java.io.FileWriter; @@ -84,6 +87,7 @@ import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.DATA_BUCKET; import static sleeper.configuration.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION; +import static sleeper.configuration.properties.table.TableProperty.STATESTORE_CLASSNAME; import static sleeper.configuration.properties.table.TableProperty.TABLE_NAME; import static sleeper.configuration.testutils.LocalStackAwsV1ClientHelper.buildAwsV1Client; import static sleeper.core.record.process.RecordsProcessedSummaryTestData.summary; @@ -166,11 +170,18 @@ public InstanceProperties createInstanceProperties(AmazonS3 s3Client, String dir s3Client.createBucket(instanceProperties.get(CONFIG_BUCKET)); new DynamoDBStateStoreCreator(instanceProperties, dynamoDBClient).create(); + new S3StateStoreCreator(instanceProperties, dynamoDBClient).create(); return instanceProperties; } public TableProperties createTable(InstanceProperties instanceProperties) { + TableProperties tableProperties = createTableProperties(instanceProperties); + tableProperties.saveToS3(s3Client); + return tableProperties; + } + + public TableProperties createTableProperties(InstanceProperties instanceProperties) { String tableName = UUID.randomUUID().toString(); TableProperties tableProperties = new TableProperties(instanceProperties); tableProperties.set(TABLE_NAME, tableName); @@ -270,8 +281,7 @@ private static void writeRecordsToFile(List records, String file) throws } private static StateStore initialiseStateStore(AmazonDynamoDB dynamoDBClient, InstanceProperties instanceProperties, TableProperties tableProperties, List splitPoints) throws StateStoreException { - StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties); - StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); + StateStore stateStore = new DynamoDBStateStore(instanceProperties, tableProperties, dynamoDBClient); stateStore.initialise(new PartitionsFromSplitPoints(getSchema(), splitPoints).construct()); return stateStore; } @@ -280,11 +290,17 @@ private static StateStore initialiseStateStore(AmazonDynamoDB dynamoDBClient, In return initialiseStateStore(dynamoDBClient, instanceProperties, tableProperties, Collections.emptyList()); } + private static StateStore initialiseS3StateStore(AmazonDynamoDB dynamoDBClient, InstanceProperties instanceProperties, TableProperties tableProperties) throws StateStoreException { + StateStore stateStore = new S3StateStore(instanceProperties, tableProperties, dynamoDBClient, new Configuration()); + stateStore.initialise(); + return stateStore; + } + private void runJob(BulkImportJobRunner runner, InstanceProperties properties, BulkImportJob job) throws IOException { String jobRunId = "test-run"; statusStore.jobValidated(ingestJobAccepted(job.toIngestJob(), validationTime).jobRunId(jobRunId).build()); BulkImportJobDriver driver = BulkImportJobDriver.from(runner, properties, - s3Client, dynamoDBClient, statusStore, + s3Client, dynamoDBClient, new Configuration(), statusStore, List.of(startTime, endTime).iterator()::next); driver.run(job, jobRunId, taskId); } @@ -530,4 +546,54 @@ void shouldNotThrowExceptionIfProvidedWithDirectoryWhichContainsParquetAndNonPar finishedIngestJobWithValidation(job.toIngestJob(), taskId, validationTime, summary(startTime, endTime, records.size(), records.size()))); } + + @ParameterizedTest + @MethodSource("getParameters") + void shouldImportDataWithS3StateStore(BulkImportJobRunner runner) throws IOException, StateStoreException { + // Given + // - Instance and table properties + String dataDir = folder.toString(); + InstanceProperties instanceProperties = createInstanceProperties(s3Client, dataDir); + TableProperties tableProperties = createTableProperties(instanceProperties); + tableProperties.set(STATESTORE_CLASSNAME, S3StateStore.class.getName()); + tableProperties.saveToS3(s3Client); + // - Write some data to be imported + List records = getRecords(); + writeRecordsToFile(records, dataDir + "/import/a.parquet"); + List inputFiles = new ArrayList<>(); + inputFiles.add(dataDir + "/import/a.parquet"); + // - State store + StateStore stateStore = initialiseS3StateStore(dynamoDBClient, instanceProperties, tableProperties); + + // When + BulkImportJob job = BulkImportJob.builder().id("my-job").files(inputFiles) + .tableName(tableProperties.get(TABLE_NAME)).build(); + runJob(runner, instanceProperties, job); + + // Then + List activeFiles = stateStore.getActiveFiles(); + List readRecords = new ArrayList<>(); + for (FileInfo fileInfo : activeFiles) { + try (ParquetRecordReader reader = new ParquetRecordReader(new Path(fileInfo.getFilename()), schema)) { + List recordsInThisFile = new ArrayList<>(); + Record record = reader.read(); + while (null != record) { + Record clonedRecord = new Record(record); + readRecords.add(clonedRecord); + recordsInThisFile.add(clonedRecord); + record = reader.read(); + } + assertThat(recordsInThisFile).isSortedAccordingTo(new RecordComparator(getSchema())); + } + } + assertThat(readRecords).hasSameSizeAs(records); + + List expectedRecords = new ArrayList<>(records); + sortRecords(expectedRecords); + sortRecords(readRecords); + assertThat(readRecords).isEqualTo(expectedRecords); + assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_NAME))).containsExactly( + finishedIngestJobWithValidation(job.toIngestJob(), taskId, validationTime, + summary(startTime, endTime, records.size(), records.size()))); + } } diff --git a/java/bulk-import/bulk-import-starter/src/main/java/sleeper/bulkimport/starter/BulkImportStarterLambda.java b/java/bulk-import/bulk-import-starter/src/main/java/sleeper/bulkimport/starter/BulkImportStarterLambda.java index b7477da23c..291c95d83e 100644 --- a/java/bulk-import/bulk-import-starter/src/main/java/sleeper/bulkimport/starter/BulkImportStarterLambda.java +++ b/java/bulk-import/bulk-import-starter/src/main/java/sleeper/bulkimport/starter/BulkImportStarterLambda.java @@ -36,6 +36,7 @@ import sleeper.ingest.job.status.IngestJobStatusStore; import sleeper.ingest.status.store.job.IngestJobStatusStoreFactory; import sleeper.statestore.StateStoreProvider; +import sleeper.utils.HadoopConfigurationProvider; import sleeper.utils.HadoopPathUtils; import java.time.Instant; @@ -69,10 +70,10 @@ public BulkImportStarterLambda() { TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3, instanceProperties); PlatformExecutor platformExecutor = PlatformExecutor.fromEnvironment( instanceProperties, tablePropertiesProvider); - hadoopConfig = new Configuration(); + hadoopConfig = HadoopConfigurationProvider.getConfigurationForLambdas(instanceProperties); ingestJobStatusStore = IngestJobStatusStoreFactory.getStatusStore(dynamo, instanceProperties); executor = new BulkImportExecutor(instanceProperties, tablePropertiesProvider, - new StateStoreProvider(dynamo, instanceProperties), + new StateStoreProvider(dynamo, instanceProperties, hadoopConfig), ingestJobStatusStore, s3, platformExecutor, Instant::now); invalidJobIdSupplier = () -> UUID.randomUUID().toString(); timeSupplier = Instant::now; diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TableStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TableStack.java index b809a266b9..ffad8717ad 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/TableStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/TableStack.java @@ -95,6 +95,7 @@ public TableStack( .runtime(Runtime.JAVA_11)); configBucket.grantReadWrite(sleeperTableLambda); + dataStack.getDataBucket().grantReadWrite(sleeperTableLambda); // S3 state store stores its state in data bucket Provider sleeperTableProvider = Provider.Builder.create(this, "SleeperTableProvider") .onEventHandler(sleeperTableLambda) diff --git a/java/clients/src/main/java/sleeper/clients/QueryClient.java b/java/clients/src/main/java/sleeper/clients/QueryClient.java index 5e11ed035f..afd2d54744 100644 --- a/java/clients/src/main/java/sleeper/clients/QueryClient.java +++ b/java/clients/src/main/java/sleeper/clients/QueryClient.java @@ -57,10 +57,10 @@ public class QueryClient extends QueryCommandLineClient { private final ExecutorService executorService; private final Map cachedQueryExecutors = new HashMap<>(); - public QueryClient(AmazonS3 s3Client, InstanceProperties instanceProperties, AmazonDynamoDB dynamoClient) throws ObjectFactoryException { + public QueryClient(AmazonS3 s3Client, InstanceProperties instanceProperties, AmazonDynamoDB dynamoClient, Configuration conf) throws ObjectFactoryException { super(s3Client, instanceProperties); this.objectFactory = new ObjectFactory(instanceProperties, s3Client, "/tmp"); - this.stateStoreProvider = new StateStoreProvider(dynamoClient, instanceProperties); + this.stateStoreProvider = new StateStoreProvider(dynamoClient, instanceProperties, conf); this.executorService = Executors.newFixedThreadPool(30); } @@ -121,7 +121,7 @@ public static void main(String[] args) throws StateStoreException, ObjectFactory AmazonDynamoDB dynamoDB = buildAwsV1Client(AmazonDynamoDBClientBuilder.standard()); InstanceProperties instanceProperties = ClientUtils.getInstanceProperties(amazonS3, args[0]); - QueryClient queryClient = new QueryClient(amazonS3, instanceProperties, dynamoDB); + QueryClient queryClient = new QueryClient(amazonS3, instanceProperties, dynamoDB, new Configuration()); queryClient.run(); } } diff --git a/java/clients/src/test/java/sleeper/clients/deploy/docker/DockerInstanceIT.java b/java/clients/src/test/java/sleeper/clients/deploy/docker/DockerInstanceIT.java index ea36cdc603..e5ccc5cb00 100644 --- a/java/clients/src/test/java/sleeper/clients/deploy/docker/DockerInstanceIT.java +++ b/java/clients/src/test/java/sleeper/clients/deploy/docker/DockerInstanceIT.java @@ -124,7 +124,7 @@ private void ingestRecords(InstanceProperties instanceProperties, TablePropertie .objectFactory(ObjectFactory.noUserJars()) .localDir(tempDir.toString()) .hadoopConfiguration(getHadoopConfiguration()) - .stateStoreProvider(new StateStoreProvider(dynamoDB, instanceProperties)) + .stateStoreProvider(new StateStoreProvider(dynamoDB, instanceProperties, null)) .s3AsyncClient(createS3AsyncClient()) .build().ingestFromRecordIteratorAndClose(tableProperties, new WrappedIterator<>(records.iterator())); } diff --git a/java/clients/src/test/java/sleeper/clients/deploy/docker/DockerInstanceTestBase.java b/java/clients/src/test/java/sleeper/clients/deploy/docker/DockerInstanceTestBase.java index 35e78ca76d..d4375f4aae 100644 --- a/java/clients/src/test/java/sleeper/clients/deploy/docker/DockerInstanceTestBase.java +++ b/java/clients/src/test/java/sleeper/clients/deploy/docker/DockerInstanceTestBase.java @@ -61,7 +61,7 @@ public class DockerInstanceTestBase { public CloseableIterator queryAllRecords( InstanceProperties instanceProperties, TableProperties tableProperties) throws Exception { - StateStore stateStore = new StateStoreProvider(dynamoDB, instanceProperties).getStateStore(tableProperties); + StateStore stateStore = new StateStoreProvider(dynamoDB, instanceProperties, null).getStateStore(tableProperties); PartitionTree tree = new PartitionTree(tableProperties.getSchema(), stateStore.getAllPartitions()); QueryExecutor executor = new QueryExecutor(ObjectFactory.noUserJars(), tableProperties, stateStore, getHadoopConfiguration(), Executors.newSingleThreadExecutor()); diff --git a/java/compaction/compaction-job-creation/src/main/java/sleeper/compaction/job/creation/CreateJobsLambda.java b/java/compaction/compaction-job-creation/src/main/java/sleeper/compaction/job/creation/CreateJobsLambda.java index a15915776f..f0b7f5110e 100644 --- a/java/compaction/compaction-job-creation/src/main/java/sleeper/compaction/job/creation/CreateJobsLambda.java +++ b/java/compaction/compaction-job-creation/src/main/java/sleeper/compaction/job/creation/CreateJobsLambda.java @@ -107,7 +107,8 @@ public CreateJobsLambda(InstanceProperties instanceProperties, .build(); this.tablePropertiesProvider = new TablePropertiesProvider(s3Client, instanceProperties); this.propertiesReloader = PropertiesReloader.ifConfigured(s3Client, instanceProperties, tablePropertiesProvider); - this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties); + this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, + HadoopConfigurationProvider.getConfigurationForLambdas(instanceProperties)); this.tableLister = new TableLister(s3Client, instanceProperties); this.jobStatusStore = CompactionJobStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties); } diff --git a/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateJobsIT.java b/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateJobsIT.java index e035196c9d..b052f05935 100644 --- a/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateJobsIT.java +++ b/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateJobsIT.java @@ -86,7 +86,7 @@ public void setUp() throws Exception { AmazonDynamoDB dynamoDB = createDynamoClient(); TableProperties tableProperties = createTable(s3, dynamoDB, instanceProperties, schema); TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3, instanceProperties); - StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties); + StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties, null); stateStore = stateStoreProvider.getStateStore(tableProperties); stateStore.initialise(); DynamoDBCompactionJobStatusStoreCreator.create(instanceProperties, dynamoDB); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/jobexecution/CompactSortedFilesRunnerLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/jobexecution/CompactSortedFilesRunnerLocalStackIT.java index 63f26144ea..dd794ee42a 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/jobexecution/CompactSortedFilesRunnerLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/jobexecution/CompactSortedFilesRunnerLocalStackIT.java @@ -144,7 +144,7 @@ void shouldDeleteMessages() throws Exception { String tableName = UUID.randomUUID().toString(); InstanceProperties instanceProperties = createProperties(s3); TableProperties tableProperties = createTable(s3, dynamoDB, instanceProperties, tableName, schema); - StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties); + StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties, null); TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3, instanceProperties); StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); stateStore.initialise(); diff --git a/java/core/src/main/java/sleeper/core/key/KeySerDe.java b/java/core/src/main/java/sleeper/core/key/KeySerDe.java index 5a21f40c99..6051af44c7 100644 --- a/java/core/src/main/java/sleeper/core/key/KeySerDe.java +++ b/java/core/src/main/java/sleeper/core/key/KeySerDe.java @@ -59,7 +59,7 @@ public KeySerDe(List rowKeyTypes) { public byte[] serialise(Key key) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); - int numKeys = key.size(); + int numKeys = key != null ? key.size() : 0; int numKeysToSerialise = Math.min(numKeys, numRowKeysInSchema); dos.writeInt(numKeysToSerialise); for (int i = 0; i < numKeysToSerialise; i++) { @@ -148,6 +148,9 @@ public Key deserialise(byte[] bytes) throws IOException { } } dis.close(); + if (key.isEmpty()) { + return null; + } return Key.create(key); } } diff --git a/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java b/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java index 7e0b5da378..e8b1375d21 100644 --- a/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java @@ -82,12 +82,18 @@ public Map> getPartitionToActiveFilesMap() throws StateStor @Override public void initialise() throws StateStoreException { + if (!isHasNoFiles()) { + throw new StateStoreException("Cannot initialise state store when files are present"); + } partitionStore.initialise(); fileInfoStore.initialise(); } @Override public void initialise(List partitions) throws StateStoreException { + if (!isHasNoFiles()) { + throw new StateStoreException("Cannot initialise state store when files are present"); + } partitionStore.initialise(partitions); fileInfoStore.initialise(); } @@ -107,6 +113,11 @@ public List getLeafPartitions() throws StateStoreException { return partitionStore.getLeafPartitions(); } + @Override + public boolean isHasNoFiles() { + return fileInfoStore.isHasNoFiles(); + } + @Override public void clearTable() { fileInfoStore.clearTable(); diff --git a/java/core/src/main/java/sleeper/core/statestore/FileInfoStore.java b/java/core/src/main/java/sleeper/core/statestore/FileInfoStore.java index 95d84f0d1d..435722c778 100644 --- a/java/core/src/main/java/sleeper/core/statestore/FileInfoStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/FileInfoStore.java @@ -124,5 +124,7 @@ void atomicallyUpdateJobStatusOfFiles(String jobId, List fileInfos) void initialise() throws StateStoreException; + boolean isHasNoFiles(); + void clearTable(); } diff --git a/java/core/src/test/java/sleeper/core/key/KeySerDeTest.java b/java/core/src/test/java/sleeper/core/key/KeySerDeTest.java index 0af9f017dd..30cf187495 100644 --- a/java/core/src/test/java/sleeper/core/key/KeySerDeTest.java +++ b/java/core/src/test/java/sleeper/core/key/KeySerDeTest.java @@ -180,4 +180,19 @@ public void shouldSerialiseAndDeserialiseCorrectlyWhenThereAreFewerKeysThanInSch assertThat(deserialisedKey4.size()).isOne(); assertThat(deserialisedKey4.get(0)).isEqualTo(1); } + + @Test + public void shouldSerialiseAndDeserialiseNullCorrectly() throws IOException { + // Given + Schema schema = Schema.builder().rowKeyFields(new Field("rowkey1", new StringType())).build(); + KeySerDe keySerDe = new KeySerDe(schema); + Key key = null; + + // When + byte[] serialised = keySerDe.serialise(key); + Key deserialisedKeys = keySerDe.deserialise(serialised); + + // Then + assertThat(deserialisedKeys).isNull(); + } } diff --git a/java/core/src/test/java/sleeper/core/statestore/inmemory/InMemoryFileInfoStore.java b/java/core/src/test/java/sleeper/core/statestore/inmemory/InMemoryFileInfoStore.java index 5f112c49d8..4eae7f819a 100644 --- a/java/core/src/test/java/sleeper/core/statestore/inmemory/InMemoryFileInfoStore.java +++ b/java/core/src/test/java/sleeper/core/statestore/inmemory/InMemoryFileInfoStore.java @@ -120,6 +120,11 @@ public void initialise() { } + @Override + public boolean isHasNoFiles() { + return activeFiles.isEmpty() && readyForGCFiles.isEmpty(); + } + @Override public void clearTable() { activeFiles.clear(); diff --git a/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorIT.java b/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorIT.java index 2fc2c78768..d66f7b0952 100644 --- a/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorIT.java +++ b/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorIT.java @@ -107,7 +107,7 @@ class SingleTable { void setupStateStoreWithFixedTime(Instant fixedTime) throws Exception { new DynamoDBStateStoreCreator(instanceProperties, dynamoDBClient).create(); - stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties); + stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, null); DynamoDBStateStore stateStore = (DynamoDBStateStore) stateStoreProvider.getStateStore(tableProperties); stateStore.initialise(); stateStore.fixTime(fixedTime); @@ -228,7 +228,7 @@ class MultipleTables { void setupStateStoreWithFixedTime(Instant fixedTime) throws Exception { new DynamoDBStateStoreCreator(instanceProperties, dynamoDBClient).create(); - stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties); + stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, null); DynamoDBStateStore stateStore1 = (DynamoDBStateStore) stateStoreProvider.getStateStore(tableProperties1); stateStore1.initialise(); stateStore1.fixTime(fixedTime); diff --git a/java/parquet/src/main/java/sleeper/utils/HadoopConfigurationProvider.java b/java/parquet/src/main/java/sleeper/utils/HadoopConfigurationProvider.java index e0efedc028..c1e075467b 100644 --- a/java/parquet/src/main/java/sleeper/utils/HadoopConfigurationProvider.java +++ b/java/parquet/src/main/java/sleeper/utils/HadoopConfigurationProvider.java @@ -48,6 +48,10 @@ public static Configuration getConfigurationForQueryLambdas(InstanceProperties i return conf; } + public static Configuration getConfigurationForEMR(InstanceProperties instanceProperties) { + return getConfigurationForECS(instanceProperties); + } + public static Configuration getConfigurationForECS(InstanceProperties instanceProperties) { Configuration conf = new Configuration(); conf.set("fs.s3a.connection.maximum", instanceProperties.get(MAXIMUM_CONNECTIONS_TO_S3)); diff --git a/java/pom.xml b/java/pom.xml index 19703b7398..0edda4958d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -82,7 +82,7 @@ Snappy is used by both hadoop-client and parquet-hadoop. We were getting multiple versions added to the classpath, and they both have vulnerabilities. --> - 1.1.10.1 + 1.1.10.4 3.23.3 @@ -165,6 +165,7 @@ false false false + 3.2.0 3.0.0-M7 @@ -1042,6 +1043,10 @@ sleeper.system.test.instances.force.redeploy ${sleeper.system.test.instances.force.redeploy} + + sleeper.system.test.force.statestore.classname + ${sleeper.system.test.force.statestore.classname} + SystemTest diff --git a/java/query/src/main/java/sleeper/query/executor/QueryExecutor.java b/java/query/src/main/java/sleeper/query/executor/QueryExecutor.java index a68bd3aaba..aa3df3a45b 100644 --- a/java/query/src/main/java/sleeper/query/executor/QueryExecutor.java +++ b/java/query/src/main/java/sleeper/query/executor/QueryExecutor.java @@ -15,18 +15,12 @@ */ package sleeper.query.executor; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.s3.AmazonS3; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sleeper.configuration.jars.ObjectFactory; -import sleeper.configuration.jars.ObjectFactoryException; -import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.properties.table.TableProperties; -import sleeper.configuration.properties.table.TablePropertiesProvider; import sleeper.core.iterator.CloseableIterator; import sleeper.core.iterator.ConcatenatingIterator; import sleeper.core.partition.Partition; @@ -40,7 +34,6 @@ import sleeper.query.model.LeafPartitionQuery; import sleeper.query.model.Query; import sleeper.query.recordretrieval.LeafPartitionQueryExecutor; -import sleeper.statestore.StateStoreProvider; import java.util.ArrayList; import java.util.HashMap; @@ -95,19 +88,6 @@ public QueryExecutor(ObjectFactory objectFactory, tableProperties.get(ITERATOR_CONFIG), tableProperties, configuration, executorService); } - public QueryExecutor(AmazonS3 s3Client, - AmazonDynamoDB dynamoDBClient, - InstanceProperties instanceProperties, - TablePropertiesProvider tablePropertiesProvider, - String tableName, - ExecutorService executorService) throws ObjectFactoryException { - this(new ObjectFactory(instanceProperties, s3Client, "/tmp"), - tablePropertiesProvider.getTableProperties(tableName), - new StateStoreProvider(AmazonDynamoDBClientBuilder.defaultClient(), instanceProperties).getStateStore(tablePropertiesProvider.getTableProperties(tableName)), - new Configuration(), - executorService); - } - /** * Initialises the partitions and the mapping from partitions to active files. * This method should be called periodically so that this class is aware of diff --git a/java/query/src/test/java/sleeper/query/executor/QueryExecutorIT.java b/java/query/src/test/java/sleeper/query/executor/QueryExecutorIT.java index a83f37e41b..edd57cf3fe 100644 --- a/java/query/src/test/java/sleeper/query/executor/QueryExecutorIT.java +++ b/java/query/src/test/java/sleeper/query/executor/QueryExecutorIT.java @@ -747,11 +747,12 @@ public void shouldReturnCorrectDataWhenRecordsInMultipleFilesInMultiplePartition ingestData(instanceProperties, stateStore, tableProperties, records.iterator()); // Split the root partition into 2: - stateStore.initialise(new PartitionsBuilder(schema) + PartitionTree partialTree = new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildrenOnDimension("root", "left", "right", 0, "I") - .buildList() - ); + .buildTree(); + stateStore.atomicallyUpdatePartitionAndCreateNewOnes(partialTree.getPartition("root"), + partialTree.getPartition("left"), partialTree.getPartition("right")); ingestData(instanceProperties, stateStore, tableProperties, records.iterator()); @@ -761,7 +762,10 @@ public void shouldReturnCorrectDataWhenRecordsInMultipleFilesInMultiplePartition .splitToNewChildrenOnDimension("left", "P1", "P3", 1, "T") .splitToNewChildrenOnDimension("right", "P2", "P4", 1, "T") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + stateStore.atomicallyUpdatePartitionAndCreateNewOnes(tree.getPartition("left"), + tree.getPartition("P1"), tree.getPartition("P3")); + stateStore.atomicallyUpdatePartitionAndCreateNewOnes(tree.getPartition("right"), + tree.getPartition("P2"), tree.getPartition("P4")); ingestData(instanceProperties, stateStore, tableProperties, records.iterator()); List filesInLeafPartition1 = stateStore.getActiveFiles().stream() diff --git a/java/query/src/test/java/sleeper/query/lambda/SqsQueryProcessorLambdaIT.java b/java/query/src/test/java/sleeper/query/lambda/SqsQueryProcessorLambdaIT.java index 777108a226..5e7d49fd95 100644 --- a/java/query/src/test/java/sleeper/query/lambda/SqsQueryProcessorLambdaIT.java +++ b/java/query/src/test/java/sleeper/query/lambda/SqsQueryProcessorLambdaIT.java @@ -741,7 +741,7 @@ private TableProperties createTimeSeriesTable(InstanceProperties instancePropert AmazonDynamoDB dynamoClient = createDynamoClient(); new TableCreator(s3Client, dynamoClient, instanceProperties).createTable(tableProperties); - StateStore stateStore = new StateStoreProvider(dynamoClient, instanceProperties).getStateStore(tableProperties); + StateStore stateStore = new StateStoreProvider(dynamoClient, instanceProperties, null).getStateStore(tableProperties); try { InitialiseStateStore.createInitialiseStateStoreFromSplitPoints(tableProperties, stateStore, splitPoints).run(); } catch (StateStoreException e) { diff --git a/java/statestore/src/main/java/sleeper/statestore/StateStoreProvider.java b/java/statestore/src/main/java/sleeper/statestore/StateStoreProvider.java index 4a3787776f..7529552bc4 100644 --- a/java/statestore/src/main/java/sleeper/statestore/StateStoreProvider.java +++ b/java/statestore/src/main/java/sleeper/statestore/StateStoreProvider.java @@ -39,11 +39,6 @@ public StateStoreProvider(AmazonDynamoDB dynamoDBClient, this(new StateStoreFactory(dynamoDBClient, instanceProperties, configuration)::getStateStore); } - public StateStoreProvider(AmazonDynamoDB dynamoDBClient, - InstanceProperties instanceProperties) { - this(dynamoDBClient, instanceProperties, null); - } - protected StateStoreProvider(Function stateStoreFactory) { this.stateStoreFactory = stateStoreFactory; this.tableNameToStateStoreCache = new HashMap<>(); diff --git a/java/statestore/src/main/java/sleeper/statestore/dynamodb/DynamoDBFileInfoStore.java b/java/statestore/src/main/java/sleeper/statestore/dynamodb/DynamoDBFileInfoStore.java index 5739256dd9..3d6f4a99b3 100644 --- a/java/statestore/src/main/java/sleeper/statestore/dynamodb/DynamoDBFileInfoStore.java +++ b/java/statestore/src/main/java/sleeper/statestore/dynamodb/DynamoDBFileInfoStore.java @@ -30,6 +30,7 @@ import com.amazonaws.services.dynamodbv2.model.PutItemRequest; import com.amazonaws.services.dynamodbv2.model.PutItemResult; import com.amazonaws.services.dynamodbv2.model.QueryRequest; +import com.amazonaws.services.dynamodbv2.model.QueryResult; import com.amazonaws.services.dynamodbv2.model.RequestLimitExceededException; import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity; @@ -390,6 +391,26 @@ private List> queryTrackingCapacity( public void initialise() { } + @Override + public boolean isHasNoFiles() { + return isTableEmpty(activeTableName) && isTableEmpty(readyForGCTableName); + } + + private boolean isTableEmpty(String tableName) { + QueryResult result = dynamoDB.query(new QueryRequest() + .withTableName(tableName) + .withExpressionAttributeNames(Map.of("#TableName", TABLE_NAME)) + .withExpressionAttributeValues(new DynamoDBRecordBuilder() + .string(":table_name", sleeperTableName) + .build()) + .withKeyConditionExpression("#TableName = :table_name") + .withConsistentRead(stronglyConsistentReads) + .withLimit(1) + .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)); + LOGGER.debug("Scanned for any file in table {}, capacity consumed = {}", tableName, result.getConsumedCapacity().getCapacityUnits()); + return result.getItems().isEmpty(); + } + @Override public void clearTable() { clearDynamoTable(activeTableName); diff --git a/java/statestore/src/main/java/sleeper/statestore/s3/S3FileInfoStore.java b/java/statestore/src/main/java/sleeper/statestore/s3/S3FileInfoStore.java index 4e2adeaddf..2b63df20bb 100644 --- a/java/statestore/src/main/java/sleeper/statestore/s3/S3FileInfoStore.java +++ b/java/statestore/src/main/java/sleeper/statestore/s3/S3FileInfoStore.java @@ -433,6 +433,20 @@ public void initialise() throws StateStoreException { s3RevisionUtils.saveFirstFilesRevision(firstRevisionId); } + @Override + public boolean isHasNoFiles() { + RevisionId revisionId = getCurrentFilesRevisionId(); + if (revisionId == null) { + return true; + } + String path = getFilesPath(revisionId); + try (ParquetReader reader = fileInfosReader(path)) { + return reader.read() == null; + } catch (IOException e) { + throw new UncheckedIOException("Failed loading files", e); + } + } + @Override public void clearTable() { Path path = new Path(fs + s3Path + "/statestore/files"); @@ -492,17 +506,21 @@ private void writeFileInfosToParquet(List fileInfos, String path) thro private List readFileInfosFromParquet(String path) throws IOException { List fileInfos = new ArrayList<>(); - ParquetReader reader = new ParquetRecordReader.Builder(new Path(path), fileSchema) - .withConf(conf) - .build(); - ParquetReaderIterator recordReader = new ParquetReaderIterator(reader); - while (recordReader.hasNext()) { - fileInfos.add(getFileInfoFromRecord(recordReader.next())); + try (ParquetReader reader = fileInfosReader(path)) { + ParquetReaderIterator recordReader = new ParquetReaderIterator(reader); + while (recordReader.hasNext()) { + fileInfos.add(getFileInfoFromRecord(recordReader.next())); + } } - recordReader.close(); return fileInfos; } + private ParquetReader fileInfosReader(String path) throws IOException { + return new ParquetRecordReader.Builder(new Path(path), fileSchema) + .withConf(conf) + .build(); + } + public void fixTime(Instant now) { clock = Clock.fixed(now, ZoneId.of("UTC")); } diff --git a/java/statestore/src/test/java/sleeper/statestore/dynamodb/DynamoDBStateStoreIT.java b/java/statestore/src/test/java/sleeper/statestore/dynamodb/DynamoDBStateStoreIT.java index 29b3b59e0f..bad9f05118 100644 --- a/java/statestore/src/test/java/sleeper/statestore/dynamodb/DynamoDBStateStoreIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/dynamodb/DynamoDBStateStoreIT.java @@ -21,13 +21,11 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.properties.table.TableProperties; -import sleeper.core.CommonTestConstants; import sleeper.core.key.Key; import sleeper.core.partition.Partition; import sleeper.core.partition.PartitionTree; @@ -45,8 +43,10 @@ import sleeper.core.schema.type.StringType; import sleeper.core.schema.type.Type; import sleeper.core.statestore.FileInfo; +import sleeper.core.statestore.FileInfoFactory; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; +import sleeper.dynamodb.tools.DynamoDBContainer; import java.time.Instant; import java.util.ArrayList; @@ -64,22 +64,21 @@ import static sleeper.configuration.properties.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.configuration.properties.table.TablePropertiesTestHelper.createTestTableProperties; import static sleeper.configuration.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION; +import static sleeper.core.schema.SchemaTestHelper.schemaWithKey; import static sleeper.dynamodb.tools.GenericContainerAwsV1ClientHelper.buildAwsV1Client; @Testcontainers public class DynamoDBStateStoreIT { - private static final int DYNAMO_PORT = 8000; private static AmazonDynamoDB dynamoDBClient; @Container - public static GenericContainer dynamoDb = new GenericContainer(CommonTestConstants.DYNAMODB_LOCAL_CONTAINER) - .withExposedPorts(DYNAMO_PORT); + public static DynamoDBContainer dynamoDb = new DynamoDBContainer(); private final InstanceProperties instanceProperties = createTestInstanceProperties(); @BeforeAll public static void initDynamoClient() { - dynamoDBClient = buildAwsV1Client(dynamoDb, DYNAMO_PORT, AmazonDynamoDBClientBuilder.standard()); + dynamoDBClient = buildAwsV1Client(dynamoDb, dynamoDb.getDynamoPort(), AmazonDynamoDBClientBuilder.standard()); } @AfterAll @@ -96,7 +95,7 @@ private DynamoDBStateStore getStateStore(Schema schema, List partitions, int garbageCollectorDelayBeforeDeletionInMinutes) throws StateStoreException { TableProperties tableProperties = createTestTableProperties(instanceProperties, schema); - tableProperties.set(GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, String.valueOf(garbageCollectorDelayBeforeDeletionInMinutes)); + tableProperties.setNumber(GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, garbageCollectorDelayBeforeDeletionInMinutes); DynamoDBStateStore stateStore = new DynamoDBStateStore(instanceProperties, tableProperties, dynamoDBClient); stateStore.initialise(partitions); return stateStore; @@ -1079,26 +1078,49 @@ public void shouldInitialiseRootPartitionCorrectlyForByteArrayKey() throws State } @Test - public void shouldReinitialisePartitions() throws StateStoreException { + void shouldNotReinitialisePartitionsWhenAFileIsPresent() throws Exception { // Given - Field field = new Field("key", new LongType()); - Schema schema = Schema.builder().rowKeyFields(field).build(); + Schema schema = schemaWithKey("key", new LongType()); PartitionTree treeBefore = new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "before1", "before2", 0L) .buildTree(); - StateStore dynamoDBStateStore = getStateStore(schema, treeBefore.getAllPartitions()); + PartitionTree treeAfter = new PartitionsBuilder(schema) + .rootFirst("root") + .splitToNewChildren("root", "after1", "after2", 10L) + .buildTree(); + StateStore stateStore = getStateStore(schema, treeBefore.getAllPartitions()); + stateStore.addFile(FileInfoFactory.builder() + .schema(schema).partitionTree(treeBefore) + .lastStateStoreUpdate(Instant.now()) + .build().leafFile(100L, 1L, 100L)); - // When + // When / Then + assertThatThrownBy(() -> stateStore.initialise(treeAfter.getAllPartitions())) + .isInstanceOf(StateStoreException.class); + assertThat(stateStore.getAllPartitions()) + .containsExactlyInAnyOrderElementsOf(treeBefore.getAllPartitions()); + } + + @Test + public void shouldReinitialisePartitionsWhenNoFilesArePresent() throws StateStoreException { + // Given + Schema schema = schemaWithKey("key", new LongType()); + PartitionTree treeBefore = new PartitionsBuilder(schema) + .rootFirst("root") + .splitToNewChildren("root", "before1", "before2", 0L) + .buildTree(); PartitionTree treeAfter = new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "after1", "after2", 10L) .buildTree(); + StateStore stateStore = getStateStore(schema, treeBefore.getAllPartitions()); - dynamoDBStateStore.initialise(treeAfter.getAllPartitions()); + // When + stateStore.initialise(treeAfter.getAllPartitions()); // Then - assertThat(dynamoDBStateStore.getAllPartitions()) + assertThat(stateStore.getAllPartitions()) .containsExactlyInAnyOrderElementsOf(treeAfter.getAllPartitions()); } } diff --git a/java/statestore/src/test/java/sleeper/statestore/dynamodb/DynamoDBStateStoreMultipleTablesIT.java b/java/statestore/src/test/java/sleeper/statestore/dynamodb/DynamoDBStateStoreMultipleTablesIT.java index 1fc0889ed8..ab02539325 100644 --- a/java/statestore/src/test/java/sleeper/statestore/dynamodb/DynamoDBStateStoreMultipleTablesIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/dynamodb/DynamoDBStateStoreMultipleTablesIT.java @@ -23,12 +23,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import sleeper.configuration.properties.instance.InstanceProperties; -import sleeper.core.CommonTestConstants; import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.schema.Schema; @@ -36,6 +34,7 @@ import sleeper.core.statestore.FileInfo; import sleeper.core.statestore.FileInfoFactory; import sleeper.core.statestore.StateStore; +import sleeper.dynamodb.tools.DynamoDBContainer; import sleeper.statestore.StateStoreFactory; import static org.assertj.core.api.Assertions.assertThat; @@ -46,11 +45,9 @@ @Testcontainers public class DynamoDBStateStoreMultipleTablesIT { - private static final int DYNAMO_PORT = 8000; private static AmazonDynamoDB dynamoDBClient; @Container - public static GenericContainer dynamoDb = new GenericContainer(CommonTestConstants.DYNAMODB_LOCAL_CONTAINER) - .withExposedPorts(DYNAMO_PORT); + public static DynamoDBContainer dynamoDb = new DynamoDBContainer(); private final InstanceProperties instanceProperties = createTestInstanceProperties(); private final Schema schema = schemaWithKey("key", new LongType()); private final StateStoreFactory stateStoreFactory = new StateStoreFactory(dynamoDBClient, instanceProperties, new Configuration()); @@ -58,7 +55,7 @@ public class DynamoDBStateStoreMultipleTablesIT { @BeforeAll public static void initDynamoClient() { - dynamoDBClient = buildAwsV1Client(dynamoDb, DYNAMO_PORT, AmazonDynamoDBClientBuilder.standard()); + dynamoDBClient = buildAwsV1Client(dynamoDb, dynamoDb.getDynamoPort(), AmazonDynamoDBClientBuilder.standard()); } @AfterAll diff --git a/java/statestore/src/test/java/sleeper/statestore/s3/S3StateStoreIT.java b/java/statestore/src/test/java/sleeper/statestore/s3/S3StateStoreIT.java index 4c410c1630..3ff9224800 100644 --- a/java/statestore/src/test/java/sleeper/statestore/s3/S3StateStoreIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/s3/S3StateStoreIT.java @@ -45,6 +45,7 @@ import sleeper.core.schema.type.StringType; import sleeper.core.schema.type.Type; import sleeper.core.statestore.FileInfo; +import sleeper.core.statestore.FileInfoFactory; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.dynamodb.tools.DynamoDBContainer; @@ -73,6 +74,7 @@ import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.DATA_BUCKET; import static sleeper.configuration.properties.table.TablePropertiesTestHelper.createTestTableProperties; import static sleeper.configuration.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION; +import static sleeper.core.schema.SchemaTestHelper.schemaWithKey; import static sleeper.dynamodb.tools.GenericContainerAwsV1ClientHelper.buildAwsV1Client; @Testcontainers @@ -1247,4 +1249,70 @@ public void shouldInitialiseRootPartitionCorrectlyForByteArrayKey() throws Excep .getPartition(partitions.get(0).getId()); assertThat(partitions).containsExactly(expectedPartition); } + + @Test + void shouldNotReinitialisePartitionsWhenAFileIsPresent() throws Exception { + // Given + Schema schema = schemaWithKey("key", new LongType()); + PartitionTree treeBefore = new PartitionsBuilder(schema) + .rootFirst("root") + .splitToNewChildren("root", "before1", "before2", 0L) + .buildTree(); + PartitionTree treeAfter = new PartitionsBuilder(schema) + .rootFirst("root") + .splitToNewChildren("root", "after1", "after2", 10L) + .buildTree(); + StateStore stateStore = getStateStore(schema, treeBefore.getAllPartitions()); + stateStore.addFile(FileInfoFactory.builder() + .schema(schema).partitionTree(treeBefore) + .lastStateStoreUpdate(Instant.now()) + .build().leafFile(100L, 1L, 100L)); + + // When / Then + assertThatThrownBy(() -> stateStore.initialise(treeAfter.getAllPartitions())) + .isInstanceOf(StateStoreException.class); + assertThat(stateStore.getAllPartitions()) + .containsExactlyInAnyOrderElementsOf(treeBefore.getAllPartitions()); + } + + @Test + void shouldReinitialisePartitionsWhenNoFilesArePresent() throws Exception { + // Given + Schema schema = schemaWithKey("key", new LongType()); + PartitionTree treeBefore = new PartitionsBuilder(schema) + .rootFirst("root") + .splitToNewChildren("root", "before1", "before2", 0L) + .buildTree(); + PartitionTree treeAfter = new PartitionsBuilder(schema) + .rootFirst("root") + .splitToNewChildren("root", "after1", "after2", 10L) + .buildTree(); + StateStore stateStore = getStateStore(schema, treeBefore.getAllPartitions()); + + // When + stateStore.initialise(treeAfter.getAllPartitions()); + + // Then + assertThat(stateStore.getAllPartitions()) + .containsExactlyInAnyOrderElementsOf(treeAfter.getAllPartitions()); + } + + @Test + void shouldStoreFileWhenMinAndMaxKeyAreNotSet() throws Exception { + // Given + Schema schema = schemaWithKey("key", new LongType()); + StateStore stateStore = getStateStore(schema); + FileInfoFactory factory = FileInfoFactory.builder() + .schema(schema).partitions(stateStore.getAllPartitions()) + .lastStateStoreUpdate(Instant.now()) + .build(); + FileInfo fileInfo = factory.leafFile(100L, null, null); + + // When + stateStore.addFile(fileInfo); + + // Then + assertThat(stateStore.getActiveFiles()) + .containsExactly(fileInfo); + } } diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/instance/SleeperInstanceContext.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/instance/SleeperInstanceContext.java index 2fe5169ad5..6ee24a532b 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/instance/SleeperInstanceContext.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/instance/SleeperInstanceContext.java @@ -20,6 +20,7 @@ import com.amazonaws.services.ecr.AmazonECR; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.providers.AwsRegionProvider; @@ -67,6 +68,7 @@ import static sleeper.configuration.properties.instance.IngestProperty.INGEST_SOURCE_BUCKET; import static sleeper.configuration.properties.instance.IngestProperty.INGEST_SOURCE_ROLE; import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.VERSION; +import static sleeper.configuration.properties.table.TableProperty.STATESTORE_CLASSNAME; import static sleeper.configuration.properties.table.TableProperty.TABLE_NAME; public class SleeperInstanceContext { @@ -214,6 +216,10 @@ private Instance createInstanceIfMissingOrThrow(String identifier, DeployInstanc properties.set(INGEST_SOURCE_BUCKET, systemTest.getSystemTestBucketName()); properties.set(INGEST_SOURCE_ROLE, systemTest.getSystemTestWriterRoleName()); properties.set(ECR_REPOSITORY_PREFIX, parameters.getSystemTestShortId()); + if (parameters.getForceStateStoreClassname() != null) { + deployInstanceConfiguration.getTableProperties() + .set(STATESTORE_CLASSNAME, parameters.getForceStateStoreClassname()); + } DeployNewInstance.builder().scriptsDirectory(parameters.getScriptsDirectory()) .deployInstanceConfiguration(deployInstanceConfiguration) .instanceId(instanceId) @@ -235,7 +241,7 @@ private Instance loadInstance(String identifier, String instanceId, String table instanceProperties.loadFromS3GivenInstanceId(s3, instanceId); TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3, instanceProperties); TableProperties tableProperties = tablePropertiesProvider.getTableProperties(tableName); - StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties); + StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties, new Configuration()); return new Instance(identifier, instanceProperties, tableProperties, tablePropertiesProvider, stateStoreProvider); diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/instance/SystemTestParameters.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/instance/SystemTestParameters.java index 213fa895fd..56f7075333 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/instance/SystemTestParameters.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/instance/SystemTestParameters.java @@ -40,6 +40,7 @@ public class SystemTestParameters { private final boolean systemTestClusterEnabled; private final boolean forceRedeploySystemTest; private final boolean forceRedeployInstances; + private final String forceStateStoreClassname; private SystemTestParameters(Builder builder) { shortTestId = builder.shortTestId; @@ -53,6 +54,11 @@ private SystemTestParameters(Builder builder) { systemTestClusterEnabled = builder.systemTestClusterEnabled; forceRedeploySystemTest = builder.forceRedeploySystemTest; forceRedeployInstances = builder.forceRedeployInstances; + forceStateStoreClassname = builder.forceStateStoreClassname; + } + + private static Builder builder() { + return new Builder(); } public static SystemTestParameters loadFromSystemProperties() { @@ -65,22 +71,27 @@ public static SystemTestParameters loadFromSystemProperties() { .subnetIds(System.getProperty("sleeper.system.test.subnet.ids")) .scriptsDirectory(findScriptsDir()) .pythonDirectory(findPythonDir()) - .outputDirectory(Optional.ofNullable(System.getProperty("sleeper.system.test.output.dir")) - .filter(not(String::isEmpty)) + .outputDirectory(getOptionalProperty("sleeper.system.test.output.dir") .map(Path::of) .orElse(null)) .systemTestClusterEnabled(getBooleanProperty("sleeper.system.test.cluster.enabled", false)) .forceRedeploySystemTest(getBooleanProperty("sleeper.system.test.force.redeploy", false)) .forceRedeployInstances(getBooleanProperty("sleeper.system.test.instances.force.redeploy", false)) + .forceStateStoreClassname(getOptionalProperty("sleeper.system.test.force.statestore.classname").orElse(null)) .build(); } private static boolean getBooleanProperty(String property, boolean defaultValue) { - return Optional.ofNullable(System.getProperty(property)) + return getOptionalProperty(property) .map(Boolean::valueOf) .orElse(defaultValue); } + private static Optional getOptionalProperty(String property) { + return Optional.ofNullable(System.getProperty(property)) + .filter(not(String::isEmpty)); + } + public String getSystemTestShortId() { return shortTestId; } @@ -161,8 +172,8 @@ public boolean isForceRedeployInstances() { return forceRedeployInstances; } - private static Builder builder() { - return new Builder(); + public String getForceStateStoreClassname() { + return forceStateStoreClassname; } private static Path findScriptsDir() { @@ -206,6 +217,7 @@ public static final class Builder { private boolean systemTestClusterEnabled; private boolean forceRedeploySystemTest; private boolean forceRedeployInstances; + private String forceStateStoreClassname; private Builder() { } @@ -265,6 +277,11 @@ public Builder forceRedeployInstances(boolean forceRedeployInstances) { return this; } + public Builder forceStateStoreClassname(String forceStateStoreClassname) { + this.forceStateStoreClassname = forceStateStoreClassname; + return this; + } + public SystemTestParameters build() { return new SystemTestParameters(this); } diff --git a/java/trino/src/test/java/sleeper/trino/testutils/PopulatedSleeperExternalResource.java b/java/trino/src/test/java/sleeper/trino/testutils/PopulatedSleeperExternalResource.java index 6f292ce44f..51f78bfbfc 100644 --- a/java/trino/src/test/java/sleeper/trino/testutils/PopulatedSleeperExternalResource.java +++ b/java/trino/src/test/java/sleeper/trino/testutils/PopulatedSleeperExternalResource.java @@ -144,7 +144,7 @@ private TableProperties getTableProperties(String tableName) { } public StateStore getStateStore(String tableName) { - StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties); + StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, null); return stateStoreProvider.getStateStore(getTableProperties(tableName)); } @@ -171,7 +171,7 @@ public void beforeAll(ExtensionContext context) throws Exception { TableProperties tableProperties = createTable( instanceProperties, tableDefinition); - StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties); + StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, null); StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); InitialiseStateStore initialiseStateStore = InitialiseStateStore .createInitialiseStateStoreFromSplitPoints(tableDefinition.schema, stateStore, tableDefinition.splitPoints); diff --git a/scripts/test/nightly/runTests.sh b/scripts/test/nightly/runTests.sh index f16ce4cb7a..9089bb4899 100755 --- a/scripts/test/nightly/runTests.sh +++ b/scripts/test/nightly/runTests.sh @@ -1,4 +1,5 @@ #!/usr/bin/env bash +# # Copyright 2022-2023 Crown Copyright # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# set -e unset CDPATH @@ -32,11 +34,11 @@ VPC=$1 SUBNETS=$2 RESULTS_BUCKET=$3 if [ "$4" == "performance" ]; then - EXTRA_MAVEN_PARAMS="-Dsleeper.system.test.cluster.enabled=true" - TEST_NAME="performance" + TEST_SUITE_PARAMS="-Dsleeper.system.test.cluster.enabled=true" + TEST_SUITE_NAME="performance" elif [ "$4" == "functional" ]; then - EXTRA_MAVEN_PARAMS="-Dsleeper.system.test.cluster.enabled=false" - TEST_NAME="functional" + TEST_SUITE_PARAMS="-Dsleeper.system.test.cluster.enabled=false" + TEST_SUITE_NAME="functional" else echo "Invalid test type: $4" echo "Valid test types are: performance, functional" @@ -46,7 +48,7 @@ source "$SCRIPTS_DIR/functions/timeUtils.sh" source "$SCRIPTS_DIR/functions/systemTestUtils.sh" START_TIMESTAMP=$(record_time) START_TIME=$(recorded_time_str "$START_TIMESTAMP" "%Y%m%d-%H%M%S") -OUTPUT_DIR="/tmp/sleeper/${TEST_NAME}Tests/$START_TIME" +OUTPUT_DIR="/tmp/sleeper/${TEST_SUITE_NAME}Tests/$START_TIME" mkdir -p "$OUTPUT_DIR" ../build/buildForTest.sh @@ -56,6 +58,8 @@ set +e runMavenSystemTests() { SHORT_ID=$1 + TEST_NAME=$2 + EXTRA_MAVEN_PARAMS=$3 mkdir "$OUTPUT_DIR/$TEST_NAME" ./maven/deployTest.sh "$SHORT_ID" "$VPC" "$SUBNETS" \ -Dsleeper.system.test.output.dir="$OUTPUT_DIR/$TEST_NAME" \ @@ -77,7 +81,8 @@ runMavenSystemTests() { ./maven/tearDown.sh "$SHORT_ID" "${INSTANCE_IDS[@]}" &> "$OUTPUT_DIR/$TEST_NAME.tearDown.log" } -runMavenSystemTests "mvn-$START_TIME" +runMavenSystemTests "mvn-$START_TIME" $TEST_SUITE_NAME $TEST_SUITE_PARAMS +runMavenSystemTests "s3-$START_TIME" s3-state-store -Dsleeper.system.test.force.statestore.classname=sleeper.statestore.s3.S3StateStore echo "[$(time_str)] Uploading test output" java -cp "${SYSTEM_TEST_JAR}" \