diff --git a/docs/12-design.md b/docs/12-design.md index ca56ebfa91..9f907426d7 100644 --- a/docs/12-design.md +++ b/docs/12-design.md @@ -171,6 +171,13 @@ contain the updated information. As two processes may attempt to update the info to be a consistency mechanism to ensure that only one update can succeed. A table in DynamoDB is used as this consistency layer. +### Potential alternatives + +We are considering alternative designs for the state store: + +- [A transaction log stored in DynamoDB, with snapshots in S3](designs/transaction-log-state-store.md) +- [A PostgreSQL database](designs/postgresql-state-store.md) + ## Ingest of data To ingest data to a table, it is necessary to write files of sorted records. Each file should contain data for one diff --git a/docs/designs/postgresql-state-store.md b/docs/designs/postgresql-state-store.md new file mode 100644 index 0000000000..4d7088c285 --- /dev/null +++ b/docs/designs/postgresql-state-store.md @@ -0,0 +1,77 @@ +# Use PostgreSQL for the state store + +## Status + +Proposed + +## Context + +We have two implementations of the state store that tracks partitions and files in a Sleeper table. This takes some +effort to keep both working as the system changes, and both have problems. + +The DynamoDB state store holds partitions and files as individual items in DynamoDB tables. This means that updates +which affect many items at once require splitting into separate transactions, and we can't always apply changes as +atomically or quickly as we would like. There's also a consistency issue when working with many items at once. As we +page through items to load them into memory, the data may change in DynamoDB in between pages. + +The S3 state store keeps one file for partitions and one for files, both in an S3 bucket. A DynamoDB table is used to +track the current revision of each file, and each change means writing a whole new file. This means that each change +takes some time to process, and if two changes happen to the same file at once, it backs out and has to retry. Under +contention, many retries may happen. It's common for updates to fail entirely due to too many retries, or to take a long +time. + +## Design Summary + +Store the file and partitions state in a PostgreSQL database, with a similar structure to the DynamoDB state store. + +The database schema may be more normalised than the DynamoDB equivalent. We can consider this during prototyping. + +## Consequences + +With a relational database, large queries can be made to present a consistent view of the data. This could avoid the +consistency issue we have with DynamoDB, but would come with some costs: + +- Transaction management and locking +- Server-based deployment model + +### Transaction management and locking + +With a relational database, larger transactions involve locking many records. If a larger transaction takes a +significant amount of time, these locks may produce waiting or conflicts. A relational database is similar to DynamoDB +in that each record needs to be updated individually. It's not clear whether this may result in slower performance than +we would like, deadlocks, or other contention issues. + +Since PostgreSQL supports larger queries with joins across tables, this should make it possible to produce a consistent +view of large amounts of data, in contrast to DynamoDB. + +If we wanted to replicate DynamoDB's conditional updates, one way would be to make a query to check the condition, and +perform an update within the same transaction. This may result in problems with transaction isolation. + +PostgreSQL defaults to a read committed isolation level. This means that during one transaction, if you make multiple +queries, the database may change in between those queries. By default, checking state before an update does not produce +a conditional update as in DynamoDB. + +With higher levels of transaction isolation, you can produce the same behaviour as a conditional update in DynamoDB. +If a conflicting update occurs at the same time, this will produce a serialization failure. This would require you to +retry the update. There may be other solutions to this problem, but this may push us towards keeping transactions as +small as possible. + +See the PostgreSQL manual on transaction isolation levels: + +https://www.postgresql.org/docs/current/transaction-iso.html + +### Deployment model + +PostgreSQL operates as a cluster of individual server nodes. We can mitigate this by using a service with automatic +scaling. + +Aurora Serverless v2 supports automatic scaling up and down between minimum and maximum limits. If you know Sleeper will +be idle for a while, we could stop the database and then only be charged for the storage. We already have a concept of +pausing Sleeper so that the periodic lambdas don't run. With Aurora Serverless this wouldn't be too much different. See +the AWS documentation: + +https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/aurora-serverless-v2.how-it-works.html + +This has some differences to the rest of Sleeper, which is designed to scale to zero by default. Aurora Serverless v2 +does not support scaling to zero. This means there would be some persistent costs unless we explicitly pause the Sleeper +instance and stop the database entirely. diff --git a/docs/designs/transaction-log-state-store.md b/docs/designs/transaction-log-state-store.md new file mode 100644 index 0000000000..85dd7c2521 --- /dev/null +++ b/docs/designs/transaction-log-state-store.md @@ -0,0 +1,158 @@ +# Store a transaction log for the state store + +## Status + +Proposed + +## Context + +We have two implementations of the state store that tracks partitions and files in a Sleeper table. This takes some +effort to keep both working as the system changes, and both have problems. + +The DynamoDB state store holds partitions and files as individual items in DynamoDB tables. This means that updates +which affect many items at once require splitting into separate transactions, and we can't always apply changes as +atomically or quickly as we would like. There's also a consistency issue when working with many items at once. As we +page through items to load them into memory, the data may change in DynamoDB in between pages. + +The S3 state store keeps one file for partitions and one for files, both in an S3 bucket. A DynamoDB table is used to +track the current revision of each file, and each change means writing a whole new file. This means that each change +takes some time to process, and if two changes happen to the same file at once, it backs out and has to retry. Under +contention, many retries may happen. It's common for updates to fail entirely due to too many retries, or to take a long +time. + +## Design Summary + +Implement the state store using an event sourced model, storing a transaction log as well as snapshots of the state. + +Store the transactions as items in DynamoDB. Store snapshots as S3 files. + +The transaction log DynamoDB table has a hash key of the table ID, and range key of the transaction number in order. Use +a conditional check to ensure the transaction number set has not been used. + +The snapshots DynamoDB table holds a reference to the latest snapshot held in S3, similar to the S3 state store +revisions table. This also holds the transaction number that snapshot was derived from. + +## Consequences + +This should result in a similar update process to the S3 state store, but without the need to save or load the whole +state at once. Since we only need to save one item per transaction, this may also result in quicker updates compared to +the DynamoDB state store. This would use a different set of patterns from those where the source of truth is a store of +the current state, and we'll look at some of the implications. + +We'll look at how to model the state as derived from the transaction log, independent of the underlying store. To avoid +reading every transaction every time, we can store a snapshot of the state, and start from a certain point in the log. + +We'll look at how to achieve ordering and durability of transactions. This is a slightly different approach for +distributed updates, and there are potential problems in how we store the transaction log. + +We'll look at some applications for parallel models or storage formats, as this approach makes it easier to derive +different formats for the state. This can allow for queries instead of loading the whole state at once, or we can model +the data in some alternative ways for various purposes. + +We'll also look at how this compares to an approach based on a relational database. + +### Modelling state + +The simplest approach is to hold a model in memory for the whole state of a Sleeper table. We can use this one, local +model for any updates or queries, and bring it up to date based on the ordered sequence of transactions. We can support +any transaction that we can apply to the model in memory. + +Whenever a change occurs, we create a transaction. Anywhere that holds the model can bring itself up to date by reading +only the transactions it hasn't seen yet, starting after the latest transaction that's already been applied locally. +With DynamoDB, consistent reads can enforce that you're really up-to-date. + +We can also skip to a certain point in the transaction log. We can have a separate process whose job is to write regular +snapshots of the model. This can run every few minutes, and write a copy of the whole model to S3. We can point to it in +DynamoDB, similar to the S3 state store's revision table. This lets us get up to date without reading the whole +transaction log. We can load the snapshot of the model, then load the transactions that have happened since the +snapshot, and apply them in memory. + +### Transaction size + +A DynamoDB item can have a maximum size of 400KB. It's unlikely a single transaction will exceed that, but we'd have to +guard against it. We can either pre-emptively split large transactions into smaller ones that we know will fit in a +DynamoDB item, or we can handle an exception from DynamoDB when an item is too large, and handle it some other way. + +To split a transaction into smaller ones that will fit, we would need to handle this in our model, to split a +transaction without affecting the aspects of atomicity which matter to the system. + +An alternative would be to detect that a transaction is too big, and write it to a file in S3 with just a pointer to +that file in DynamoDB. This could be significantly slower than a standard DynamoDB update, and may slow down reading +the transaction log. + +### Distributed updates and ordering + +#### Immediate ordering approach + +To achieve ordered, durable updates, we can give each transaction a number. When we add a transaction, we use the next +number in sequence after the current latest transaction. We use a conditional check to refuse the update if there's +already a transaction with that number. We then need to retry if we're out of date. + +This retry is comparable to an update in the S3 state store, but you don't need to store the whole state. You also don't +need to reload the whole state each time. Instead, you read the transactions you haven't seen yet and apply them to your +local model. As in the S3 implementation, you perform a conditional check on your local model before saving the update. +After your new transaction is saved, you could apply that to your local model as well, and keep it in memory to reuse +for other updates or queries. + +There are still potential concurrency issues with this approach, since retries are still required under contention. We +don't know for sure whether this will reduce contention issues by a few percent relative to the S3 state store (in which +case the transaction log approach doesn't solve the problem), or eliminate them completely. Since each update is +smaller, it should be quicker. We could prototype this to gauge whether it will be eg. 5% quicker or 5x quicker. + +#### Eventual consistency approach + +If we wanted to avoid this retry, there is an alternative approach to store the transaction immediately. To build the +primary key, you could take a local timestamp at the writer, append some random data to the end, and use that to order +the transactions. This would provide resolution between transactions that happen at the same time, and a reader after +the fact would see a consistent view of which one happened first. We could then store this without checking for any +other transactions being written at the same time. + +This produces a durability problem where if two writers' clocks are out of sync, one of them can insert a transaction +into the log in the past, according to the other writer. If two updates are mutually exclusive, one of them may be +inserted before the previous update, and cause the original update to be lost. The first writer may believe its update +was successful because there was a period of time before the second writer added a transaction before it. + +We could design the system to allow for some slack and recover from transactions being undone over a short time period. +This would be complicated to achieve, although it may allow for improved performance as updates don't need to wait. The +increase in complexity means this may not be as practical as an approach where a full ordering is established +immediately. + +### Parallel models + +So far we've assumed that we'll always work with the entire state of a Sleeper table at once, with one model. With a +transaction log it can be more practical to add alternative models for read or update. + +#### DynamoDB queries + +The DynamoDB state store has advantages for queries, as we only need to read the relevant parts of the state. If we +want to retain this benefit, we could store the same DynamoDB structure we use now. + +Similar to the process for S3 snapshots, we could regularly store a snapshot of the Sleeper table state as items in +DynamoDB tables, in whatever format is convenient for queries. One option would be to use the same tables as for the +DynamoDB state store, but use a snapshot ID instead of the table ID. + +If we want this view to be 100% up to date, then when we perform a query we could still read the latest transactions +that have happened since the snapshot, and include that data in the result. + +#### Status stores for reporting + +If we capture events related to jobs as transactions in the log, that would allow us to produce a separate model from +the same transactions that can show what jobs have occurred, and every detail we track about them in the state store. + +This could unify some updates to jobs that are currently done in a separate reporting status store, which we would +ideally like to happen simultaneously with some change in the state store, eg. a compaction job finishing. + +#### Update models + +If we ever decide to avoid holding the whole Sleeper table state in memory, we could create an alternative model to +apply a single update. Rather than hold the entire state in memory, we could load just the relevant state to perform the +conditional check, eg. from a DynamoDB queryable snapshot. When we bring this model up to date from the transaction log, +we can ignore transactions that are not relevant to the update. + +This would add complexity to the way we model the table state, so we may prefer to avoid this. It is an option we could +consider. + +## Resources + +- [Martin Fowler's article on event sourcing](https://martinfowler.com/eaaDev/EventSourcing.html) +- [Greg Young's talk on event sourcing](https://www.youtube.com/watch?v=LDW0QWie21s) diff --git a/java/pom.xml b/java/pom.xml index 956194337c..9548a6bf0d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -110,7 +110,7 @@ <!-- jungrapht-visualization-samples also declares an old version, which is a dependency of the build module. --> <logback.version>1.4.14</logback.version> <aws-java-sdk.version>1.12.498</aws-java-sdk.version> - <aws-java-sdk-v2.version>2.20.95</aws-java-sdk-v2.version> + <aws-java-sdk-v2.version>2.25.9</aws-java-sdk-v2.version> <aws-crt.version>0.22.2</aws-crt.version> <aws-lambda-java-events.version>3.11.2</aws-lambda-java-events.version> <aws-lambda-java-core.version>1.2.2</aws-lambda-java-core.version> @@ -123,7 +123,7 @@ <commons-text.version>1.10.0</commons-text.version> <janino.version>3.1.12</janino.version> <commons-net.version>3.9.0</commons-net.version> - <jackson.version>2.16.2</jackson.version> + <jackson.version>2.17.0</jackson.version> <!-- Trino integration uses a different version of JJWT, this is the version used in the build module --> <jjwt.build.version>0.12.5</jjwt.build.version> <facebook.collections.version>0.1.32</facebook.collections.version> @@ -136,7 +136,7 @@ <datasketches.version>3.3.0</datasketches.version> <slf4j.version>2.0.12</slf4j.version> <reload4j.version>1.2.24</reload4j.version> - <java-websocket.version>1.5.3</java-websocket.version> + <java-websocket.version>1.5.6</java-websocket.version> <arrow.version>11.0.0</arrow.version> <bouncycastle.version>1.75</bouncycastle.version> <athena.version>2023.3.1</athena.version> @@ -157,7 +157,7 @@ <junit.version>5.10.2</junit.version> <junit.platform.version>1.10.1</junit.platform.version> <mockito.version>4.11.0</mockito.version> - <testcontainers.version>1.19.0</testcontainers.version> + <testcontainers.version>1.19.7</testcontainers.version> <wiremock.version>2.35.0</wiremock.version> <assertj.version>3.24.1</assertj.version> <approvaltests.version>22.3.3</approvaltests.version> diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/SystemTestGarbageCollection.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/SystemTestGarbageCollection.java index e66dcc2923..23a1ac0261 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/SystemTestGarbageCollection.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/SystemTestGarbageCollection.java @@ -39,7 +39,7 @@ public SystemTestGarbageCollection invoke() { } public void waitFor() { - WaitForGC.waitUntilNoUnreferencedFiles(instance.getStateStore(), + WaitForGC.waitUntilNoUnreferencedFiles(instance, PollWithRetries.intervalAndPollingTimeout( Duration.ofSeconds(5), Duration.ofSeconds(30))); } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/WaitForGC.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/WaitForGC.java index d9cf9e2632..107f367791 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/WaitForGC.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/gc/WaitForGC.java @@ -15,27 +15,37 @@ */ package sleeper.systemtest.dsl.gc; +import sleeper.configuration.properties.table.TableProperties; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.util.PollWithRetries; +import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import java.time.Duration; import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toUnmodifiableList; +import static sleeper.configuration.properties.table.TableProperty.TABLE_ID; public class WaitForGC { private WaitForGC() { } - public static void waitUntilNoUnreferencedFiles(StateStore stateStore, PollWithRetries poll) { + public static void waitUntilNoUnreferencedFiles(SystemTestInstanceContext instance, PollWithRetries poll) { + Map<String, TableProperties> tablesById = instance.streamTableProperties() + .collect(toMap(table -> table.get(TABLE_ID), table -> table)); try { poll.pollUntil("no unreferenced files are present", () -> { - try { - return stateStore.getReadyForGCFilenamesBefore(Instant.now().plus(Duration.ofDays(1))) - .findAny().isEmpty(); - } catch (StateStoreException e) { - throw new RuntimeException(e); - } + List<String> emptyTableIds = tablesById.values().stream() + .filter(table -> hasNoUnreferencedFiles(instance.getStateStore(table))) + .map(table -> table.get(TABLE_ID)) + .collect(toUnmodifiableList()); + emptyTableIds.forEach(tablesById::remove); + return tablesById.isEmpty(); }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -43,4 +53,13 @@ public static void waitUntilNoUnreferencedFiles(StateStore stateStore, PollWithR } } + private static boolean hasNoUnreferencedFiles(StateStore stateStore) { + try { + return stateStore.getReadyForGCFilenamesBefore(Instant.now().plus(Duration.ofDays(1))) + .findAny().isEmpty(); + } catch (StateStoreException e) { + throw new RuntimeException(e); + } + } + } diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/instance/MultipleTablesTest.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/instance/MultipleTablesTest.java index f45f7a13bb..7128a3635f 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/instance/MultipleTablesTest.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/instance/MultipleTablesTest.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import sleeper.compaction.strategy.impl.BasicCompactionStrategy; import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.schema.Schema; @@ -31,8 +32,12 @@ import java.util.stream.LongStream; import static org.assertj.core.api.Assertions.assertThat; +import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; +import static sleeper.configuration.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS; +import static sleeper.configuration.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION; import static sleeper.configuration.properties.table.TableProperty.PARTITION_SPLIT_THRESHOLD; import static sleeper.core.statestore.FilesReportTestHelper.activeAndReadyForGCFiles; +import static sleeper.core.statestore.FilesReportTestHelper.activeFiles; import static sleeper.core.testutils.printers.FileReferencePrinter.printExpectedFilesForAllTables; import static sleeper.core.testutils.printers.FileReferencePrinter.printTableFilesExpectingIdentical; import static sleeper.core.testutils.printers.PartitionsPrinter.printExpectedPartitionsForAllTables; @@ -43,6 +48,7 @@ import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA; import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.ROW_KEY_FIELD_NAME; import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.withDefaultProperties; +import static sleeper.systemtest.dsl.testutil.SystemTestTableMetricsHelper.tableMetrics; @InMemoryDslTest public class MultipleTablesTest { @@ -76,6 +82,41 @@ void shouldIngestOneFileToMultipleTables(SleeperSystemTest sleeper) { .allSatisfy((table, files) -> assertThat(files).hasSize(1)); } + @Test + void shouldCompactAndGCMultipleTables(SleeperSystemTest sleeper) { + // Given we have several tables + // And we ingest two source files as separate jobs + sleeper.tables().createManyWithProperties(NUMBER_OF_TABLES, schema, Map.of( + COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), + COMPACTION_FILES_BATCH_SIZE, "2", + GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "0")); + sleeper.sourceFiles() + .createWithNumberedRecords(schema, "file1.parquet", LongStream.range(0, 50)) + .createWithNumberedRecords(schema, "file2.parquet", LongStream.range(50, 100)); + sleeper.ingest().byQueue() + .sendSourceFilesToAllTables("file1.parquet") + .sendSourceFilesToAllTables("file2.parquet") + .invokeTask().waitForJobs(); + + // When we run compaction and GC + sleeper.compaction().createJobs(NUMBER_OF_TABLES).invokeTasks(1).waitForJobs(); + sleeper.garbageCollection().invoke().waitFor(); + + // Then all tables should have one active file with the expected records, and none ready for GC + assertThat(sleeper.query().byQueue().allRecordsByTable()) + .hasSize(NUMBER_OF_TABLES) + .allSatisfy(((table, records) -> assertThat(records).containsExactlyElementsOf( + sleeper.generateNumberedRecords(schema, LongStream.range(0, 100))))); + var tables = sleeper.tables().list(); + var partitionsByTable = sleeper.partitioning().treeByTable(); + var filesByTable = sleeper.tableFiles().filesByTable(); + PartitionTree expectedPartitions = new PartitionsBuilder(schema).singlePartition("root").buildTree(); + FileReferenceFactory fileReferenceFactory = FileReferenceFactory.from(expectedPartitions); + assertThat(printTableFilesExpectingIdentical(partitionsByTable, filesByTable)) + .isEqualTo(printExpectedFilesForAllTables(tables, expectedPartitions, + activeFiles(fileReferenceFactory.rootFile("merged.parquet", 100)))); + } + @Test void shouldSplitPartitionsOfMultipleTables(SleeperSystemTest sleeper) { // Given we have several tables with a split threshold of 20 @@ -133,4 +174,32 @@ void shouldSplitPartitionsOfMultipleTables(SleeperSystemTest sleeper) { List.of("root", "L", "R", "LL", "LR", "RL", "RR")))); } + @Test + void shouldGenerateMetricsForMultipleTables(SleeperSystemTest sleeper) { + // Given we have several tables + // And we ingest two source files as separate jobs + sleeper.tables().createManyWithProperties(NUMBER_OF_TABLES, schema, Map.of( + COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), + COMPACTION_FILES_BATCH_SIZE, "2", + GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "0")); + sleeper.sourceFiles() + .createWithNumberedRecords(schema, "file1.parquet", LongStream.range(0, 50)) + .createWithNumberedRecords(schema, "file2.parquet", LongStream.range(50, 100)); + sleeper.ingest().byQueue() + .sendSourceFilesToAllTables("file1.parquet") + .sendSourceFilesToAllTables("file2.parquet") + .invokeTask().waitForJobs(); + + // When we compute table metrics + sleeper.tableMetrics().generate(); + + // Then each table has the expected metrics + sleeper.tables().forEach(() -> { + assertThat(sleeper.tableMetrics().get()).isEqualTo(tableMetrics(sleeper) + .partitionCount(1).leafPartitionCount(1) + .fileCount(2).recordCount(100) + .averageActiveFilesPerPartition(2) + .build()); + }); + } } diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryCompaction.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryCompaction.java index b8b80302c4..70dbe1f92b 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryCompaction.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryCompaction.java @@ -29,7 +29,6 @@ import sleeper.compaction.testutils.InMemoryCompactionJobStatusStore; import sleeper.compaction.testutils.InMemoryCompactionTaskStatusStore; import sleeper.configuration.jars.ObjectFactory; -import sleeper.configuration.properties.table.FixedTablePropertiesProvider; import sleeper.configuration.properties.table.TableProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; import sleeper.core.iterator.CloseableIterator; @@ -137,12 +136,12 @@ private void createJobs(Mode mode) { private CreateCompactionJobs jobCreator(Mode mode) { return new CreateCompactionJobs(ObjectFactory.noUserJars(), instance.getInstanceProperties(), - tablePropertiesProvider(instance), instance.getStateStoreProvider(), jobSender(), jobStore, mode); + instance.getTablePropertiesProvider(), instance.getStateStoreProvider(), jobSender(), jobStore, mode); } } private void finishJobs(SystemTestInstanceContext instance, String taskId) { - TablePropertiesProvider tablesProvider = tablePropertiesProvider(instance); + TablePropertiesProvider tablesProvider = instance.getTablePropertiesProvider(); for (CompactionJob job : queuedJobsById.values()) { TableProperties tableProperties = tablesProvider.getById(job.getTableId()); RecordsProcessedSummary summary = compact(job, tableProperties, instance.getStateStore(tableProperties), taskId); @@ -212,11 +211,6 @@ private RecordsProcessed mergeInputFiles(CompactionJob job, Partition partition, .sum()); } - private static TablePropertiesProvider tablePropertiesProvider(SystemTestInstanceContext instance) { - return new FixedTablePropertiesProvider( - instance.streamTableProperties().collect(toUnmodifiableList())); - } - private CreateCompactionJobs.JobSender jobSender() { return job -> queuedJobsById.put(job.getId(), job); } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/MultipleTablesIT.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/MultipleTablesIT.java index 4e791e8893..fa72d3b44a 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/MultipleTablesIT.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/MultipleTablesIT.java @@ -19,12 +19,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import sleeper.compaction.strategy.impl.BasicCompactionStrategy; import sleeper.core.partition.PartitionTree; +import sleeper.core.partition.PartitionsBuilder; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReferenceFactory; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues; import sleeper.systemtest.suite.fixtures.SystemTestSchema; +import sleeper.systemtest.suite.testutil.Slow; import sleeper.systemtest.suite.testutil.SystemTest; import java.util.List; @@ -35,8 +38,12 @@ import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.INGEST_JOB_QUEUE_URL; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_SPLITTING_JOB_QUEUE_URL; +import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; +import static sleeper.configuration.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS; +import static sleeper.configuration.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION; import static sleeper.configuration.properties.table.TableProperty.PARTITION_SPLIT_THRESHOLD; import static sleeper.core.statestore.FilesReportTestHelper.activeAndReadyForGCFiles; +import static sleeper.core.statestore.FilesReportTestHelper.activeFiles; import static sleeper.core.testutils.printers.FileReferencePrinter.printExpectedFilesForAllTables; import static sleeper.core.testutils.printers.FileReferencePrinter.printTableFilesExpectingIdentical; import static sleeper.core.testutils.printers.PartitionsPrinter.printExpectedPartitionsForAllTables; @@ -44,13 +51,14 @@ import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix; import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo; import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField; +import static sleeper.systemtest.dsl.testutil.SystemTestTableMetricsHelper.tableMetrics; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.MAIN; -import static sleeper.systemtest.suite.testutil.PartitionsTestHelper.partitionsBuilder; @SystemTest +@Slow // Slow because compactions run for 200 tables in one task public class MultipleTablesIT { private final Schema schema = SystemTestSchema.DEFAULT_SCHEMA; - private static final int NUMBER_OF_TABLES = 5; + private static final int NUMBER_OF_TABLES = 200; @BeforeEach void setUp(SleeperSystemTest sleeper, AfterTestPurgeQueues purgeQueues) { @@ -88,6 +96,41 @@ void shouldIngestOneFileToMultipleTables(SleeperSystemTest sleeper) { .allSatisfy((table, files) -> assertThat(files).hasSize(1)); } + @Test + void shouldCompactAndGCMultipleTables(SleeperSystemTest sleeper) { + // Given we have several tables + // And we ingest two source files as separate jobs + sleeper.tables().createManyWithProperties(NUMBER_OF_TABLES, schema, Map.of( + COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), + COMPACTION_FILES_BATCH_SIZE, "2", + GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "0")); + sleeper.sourceFiles() + .createWithNumberedRecords(schema, "file1.parquet", LongStream.range(0, 50)) + .createWithNumberedRecords(schema, "file2.parquet", LongStream.range(50, 100)); + sleeper.ingest().byQueue() + .sendSourceFilesToAllTables("file1.parquet") + .sendSourceFilesToAllTables("file2.parquet") + .invokeTask().waitForJobs(); + + // When we run compaction and GC + sleeper.compaction().createJobs(NUMBER_OF_TABLES).invokeTasks(1).waitForJobs(); + sleeper.garbageCollection().invoke().waitFor(); + + // Then all tables should have one active file with the expected records, and none ready for GC + assertThat(sleeper.query().byQueue().allRecordsByTable()) + .hasSize(NUMBER_OF_TABLES) + .allSatisfy(((table, records) -> assertThat(records).containsExactlyElementsOf( + sleeper.generateNumberedRecords(schema, LongStream.range(0, 100))))); + var tables = sleeper.tables().list(); + var partitionsByTable = sleeper.partitioning().treeByTable(); + var filesByTable = sleeper.tableFiles().filesByTable(); + PartitionTree expectedPartitions = new PartitionsBuilder(schema).singlePartition("root").buildTree(); + FileReferenceFactory fileReferenceFactory = FileReferenceFactory.from(expectedPartitions); + assertThat(printTableFilesExpectingIdentical(partitionsByTable, filesByTable)) + .isEqualTo(printExpectedFilesForAllTables(tables, expectedPartitions, + activeFiles(fileReferenceFactory.rootFile("merged.parquet", 100)))); + } + @Test void shouldSplitPartitionsOfMultipleTables(SleeperSystemTest sleeper) { // Given we have several tables with a split threshold of 20 @@ -118,7 +161,7 @@ void shouldSplitPartitionsOfMultipleTables(SleeperSystemTest sleeper) { var tables = sleeper.tables().list(); var partitionsByTable = sleeper.partitioning().treeByTable(); var filesByTable = sleeper.tableFiles().filesByTable(); - PartitionTree expectedPartitions = partitionsBuilder(schema) + PartitionTree expectedPartitions = new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "L", "R", "row-50") .splitToNewChildren("L", "LL", "LR", "row-25") @@ -144,4 +187,33 @@ void shouldSplitPartitionsOfMultipleTables(SleeperSystemTest sleeper) { fileReferenceFactory.partitionFile("RRR", 13)), List.of("root", "L", "R", "LL", "LR", "RL", "RR")))); } + + @Test + void shouldGenerateMetricsForMultipleTables(SleeperSystemTest sleeper) { + // Given we have several tables + // And we ingest two source files as separate jobs + sleeper.tables().createManyWithProperties(NUMBER_OF_TABLES, schema, Map.of( + COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), + COMPACTION_FILES_BATCH_SIZE, "2", + GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "0")); + sleeper.sourceFiles() + .createWithNumberedRecords(schema, "file1.parquet", LongStream.range(0, 50)) + .createWithNumberedRecords(schema, "file2.parquet", LongStream.range(50, 100)); + sleeper.ingest().byQueue() + .sendSourceFilesToAllTables("file1.parquet") + .sendSourceFilesToAllTables("file2.parquet") + .invokeTask().waitForJobs(); + + // When we compute table metrics + sleeper.tableMetrics().generate(); + + // Then each table has the expected metrics + sleeper.tables().forEach(() -> { + assertThat(sleeper.tableMetrics().get()).isEqualTo(tableMetrics(sleeper) + .partitionCount(1).leafPartitionCount(1) + .fileCount(2).recordCount(100) + .averageActiveFilesPerPartition(2) + .build()); + }); + } }