Skip to content

Commit

Permalink
Merge branch 'develop' into 2022-rename-instance-properties
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 authored Mar 14, 2024
2 parents 8e909a8 + 5983126 commit f5b9b0a
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 23 deletions.
7 changes: 7 additions & 0 deletions docs/12-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ contain the updated information. As two processes may attempt to update the info
to be a consistency mechanism to ensure that only one update can succeed. A table in DynamoDB is used as this
consistency layer.

### Potential alternatives

We are considering alternative designs for the state store:

- [A transaction log stored in DynamoDB, with snapshots in S3](designs/transaction-log-state-store.md)
- [A PostgreSQL database](designs/postgresql-state-store.md)

## Ingest of data

To ingest data to a table, it is necessary to write files of sorted records. Each file should contain data for one
Expand Down
77 changes: 77 additions & 0 deletions docs/designs/postgresql-state-store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Use PostgreSQL for the state store

## Status

Proposed

## Context

We have two implementations of the state store that tracks partitions and files in a Sleeper table. This takes some
effort to keep both working as the system changes, and both have problems.

The DynamoDB state store holds partitions and files as individual items in DynamoDB tables. This means that updates
which affect many items at once require splitting into separate transactions, and we can't always apply changes as
atomically or quickly as we would like. There's also a consistency issue when working with many items at once. As we
page through items to load them into memory, the data may change in DynamoDB in between pages.

The S3 state store keeps one file for partitions and one for files, both in an S3 bucket. A DynamoDB table is used to
track the current revision of each file, and each change means writing a whole new file. This means that each change
takes some time to process, and if two changes happen to the same file at once, it backs out and has to retry. Under
contention, many retries may happen. It's common for updates to fail entirely due to too many retries, or to take a long
time.

## Design Summary

Store the file and partitions state in a PostgreSQL database, with a similar structure to the DynamoDB state store.

The database schema may be more normalised than the DynamoDB equivalent. We can consider this during prototyping.

## Consequences

With a relational database, large queries can be made to present a consistent view of the data. This could avoid the
consistency issue we have with DynamoDB, but would come with some costs:

- Transaction management and locking
- Server-based deployment model

### Transaction management and locking

With a relational database, larger transactions involve locking many records. If a larger transaction takes a
significant amount of time, these locks may produce waiting or conflicts. A relational database is similar to DynamoDB
in that each record needs to be updated individually. It's not clear whether this may result in slower performance than
we would like, deadlocks, or other contention issues.

Since PostgreSQL supports larger queries with joins across tables, this should make it possible to produce a consistent
view of large amounts of data, in contrast to DynamoDB.

If we wanted to replicate DynamoDB's conditional updates, one way would be to make a query to check the condition, and
perform an update within the same transaction. This may result in problems with transaction isolation.

PostgreSQL defaults to a read committed isolation level. This means that during one transaction, if you make multiple
queries, the database may change in between those queries. By default, checking state before an update does not produce
a conditional update as in DynamoDB.

With higher levels of transaction isolation, you can produce the same behaviour as a conditional update in DynamoDB.
If a conflicting update occurs at the same time, this will produce a serialization failure. This would require you to
retry the update. There may be other solutions to this problem, but this may push us towards keeping transactions as
small as possible.

See the PostgreSQL manual on transaction isolation levels:

https://www.postgresql.org/docs/current/transaction-iso.html

### Deployment model

PostgreSQL operates as a cluster of individual server nodes. We can mitigate this by using a service with automatic
scaling.

Aurora Serverless v2 supports automatic scaling up and down between minimum and maximum limits. If you know Sleeper will
be idle for a while, we could stop the database and then only be charged for the storage. We already have a concept of
pausing Sleeper so that the periodic lambdas don't run. With Aurora Serverless this wouldn't be too much different. See
the AWS documentation:

https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/aurora-serverless-v2.how-it-works.html

This has some differences to the rest of Sleeper, which is designed to scale to zero by default. Aurora Serverless v2
does not support scaling to zero. This means there would be some persistent costs unless we explicitly pause the Sleeper
instance and stop the database entirely.
158 changes: 158 additions & 0 deletions docs/designs/transaction-log-state-store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Store a transaction log for the state store

## Status

Proposed

## Context

We have two implementations of the state store that tracks partitions and files in a Sleeper table. This takes some
effort to keep both working as the system changes, and both have problems.

The DynamoDB state store holds partitions and files as individual items in DynamoDB tables. This means that updates
which affect many items at once require splitting into separate transactions, and we can't always apply changes as
atomically or quickly as we would like. There's also a consistency issue when working with many items at once. As we
page through items to load them into memory, the data may change in DynamoDB in between pages.

The S3 state store keeps one file for partitions and one for files, both in an S3 bucket. A DynamoDB table is used to
track the current revision of each file, and each change means writing a whole new file. This means that each change
takes some time to process, and if two changes happen to the same file at once, it backs out and has to retry. Under
contention, many retries may happen. It's common for updates to fail entirely due to too many retries, or to take a long
time.

## Design Summary

Implement the state store using an event sourced model, storing a transaction log as well as snapshots of the state.

Store the transactions as items in DynamoDB. Store snapshots as S3 files.

The transaction log DynamoDB table has a hash key of the table ID, and range key of the transaction number in order. Use
a conditional check to ensure the transaction number set has not been used.

The snapshots DynamoDB table holds a reference to the latest snapshot held in S3, similar to the S3 state store
revisions table. This also holds the transaction number that snapshot was derived from.

## Consequences

This should result in a similar update process to the S3 state store, but without the need to save or load the whole
state at once. Since we only need to save one item per transaction, this may also result in quicker updates compared to
the DynamoDB state store. This would use a different set of patterns from those where the source of truth is a store of
the current state, and we'll look at some of the implications.

We'll look at how to model the state as derived from the transaction log, independent of the underlying store. To avoid
reading every transaction every time, we can store a snapshot of the state, and start from a certain point in the log.

We'll look at how to achieve ordering and durability of transactions. This is a slightly different approach for
distributed updates, and there are potential problems in how we store the transaction log.

We'll look at some applications for parallel models or storage formats, as this approach makes it easier to derive
different formats for the state. This can allow for queries instead of loading the whole state at once, or we can model
the data in some alternative ways for various purposes.

We'll also look at how this compares to an approach based on a relational database.

### Modelling state

The simplest approach is to hold a model in memory for the whole state of a Sleeper table. We can use this one, local
model for any updates or queries, and bring it up to date based on the ordered sequence of transactions. We can support
any transaction that we can apply to the model in memory.

Whenever a change occurs, we create a transaction. Anywhere that holds the model can bring itself up to date by reading
only the transactions it hasn't seen yet, starting after the latest transaction that's already been applied locally.
With DynamoDB, consistent reads can enforce that you're really up-to-date.

We can also skip to a certain point in the transaction log. We can have a separate process whose job is to write regular
snapshots of the model. This can run every few minutes, and write a copy of the whole model to S3. We can point to it in
DynamoDB, similar to the S3 state store's revision table. This lets us get up to date without reading the whole
transaction log. We can load the snapshot of the model, then load the transactions that have happened since the
snapshot, and apply them in memory.

### Transaction size

A DynamoDB item can have a maximum size of 400KB. It's unlikely a single transaction will exceed that, but we'd have to
guard against it. We can either pre-emptively split large transactions into smaller ones that we know will fit in a
DynamoDB item, or we can handle an exception from DynamoDB when an item is too large, and handle it some other way.

To split a transaction into smaller ones that will fit, we would need to handle this in our model, to split a
transaction without affecting the aspects of atomicity which matter to the system.

An alternative would be to detect that a transaction is too big, and write it to a file in S3 with just a pointer to
that file in DynamoDB. This could be significantly slower than a standard DynamoDB update, and may slow down reading
the transaction log.

### Distributed updates and ordering

#### Immediate ordering approach

To achieve ordered, durable updates, we can give each transaction a number. When we add a transaction, we use the next
number in sequence after the current latest transaction. We use a conditional check to refuse the update if there's
already a transaction with that number. We then need to retry if we're out of date.

This retry is comparable to an update in the S3 state store, but you don't need to store the whole state. You also don't
need to reload the whole state each time. Instead, you read the transactions you haven't seen yet and apply them to your
local model. As in the S3 implementation, you perform a conditional check on your local model before saving the update.
After your new transaction is saved, you could apply that to your local model as well, and keep it in memory to reuse
for other updates or queries.

There are still potential concurrency issues with this approach, since retries are still required under contention. We
don't know for sure whether this will reduce contention issues by a few percent relative to the S3 state store (in which
case the transaction log approach doesn't solve the problem), or eliminate them completely. Since each update is
smaller, it should be quicker. We could prototype this to gauge whether it will be eg. 5% quicker or 5x quicker.

#### Eventual consistency approach

If we wanted to avoid this retry, there is an alternative approach to store the transaction immediately. To build the
primary key, you could take a local timestamp at the writer, append some random data to the end, and use that to order
the transactions. This would provide resolution between transactions that happen at the same time, and a reader after
the fact would see a consistent view of which one happened first. We could then store this without checking for any
other transactions being written at the same time.

This produces a durability problem where if two writers' clocks are out of sync, one of them can insert a transaction
into the log in the past, according to the other writer. If two updates are mutually exclusive, one of them may be
inserted before the previous update, and cause the original update to be lost. The first writer may believe its update
was successful because there was a period of time before the second writer added a transaction before it.

We could design the system to allow for some slack and recover from transactions being undone over a short time period.
This would be complicated to achieve, although it may allow for improved performance as updates don't need to wait. The
increase in complexity means this may not be as practical as an approach where a full ordering is established
immediately.

### Parallel models

So far we've assumed that we'll always work with the entire state of a Sleeper table at once, with one model. With a
transaction log it can be more practical to add alternative models for read or update.

#### DynamoDB queries

The DynamoDB state store has advantages for queries, as we only need to read the relevant parts of the state. If we
want to retain this benefit, we could store the same DynamoDB structure we use now.

Similar to the process for S3 snapshots, we could regularly store a snapshot of the Sleeper table state as items in
DynamoDB tables, in whatever format is convenient for queries. One option would be to use the same tables as for the
DynamoDB state store, but use a snapshot ID instead of the table ID.

If we want this view to be 100% up to date, then when we perform a query we could still read the latest transactions
that have happened since the snapshot, and include that data in the result.

#### Status stores for reporting

If we capture events related to jobs as transactions in the log, that would allow us to produce a separate model from
the same transactions that can show what jobs have occurred, and every detail we track about them in the state store.

This could unify some updates to jobs that are currently done in a separate reporting status store, which we would
ideally like to happen simultaneously with some change in the state store, eg. a compaction job finishing.

#### Update models

If we ever decide to avoid holding the whole Sleeper table state in memory, we could create an alternative model to
apply a single update. Rather than hold the entire state in memory, we could load just the relevant state to perform the
conditional check, eg. from a DynamoDB queryable snapshot. When we bring this model up to date from the transaction log,
we can ignore transactions that are not relevant to the update.

This would add complexity to the way we model the table state, so we may prefer to avoid this. It is an option we could
consider.

## Resources

- [Martin Fowler's article on event sourcing](https://martinfowler.com/eaaDev/EventSourcing.html)
- [Greg Young's talk on event sourcing](https://www.youtube.com/watch?v=LDW0QWie21s)
8 changes: 4 additions & 4 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
<!-- jungrapht-visualization-samples also declares an old version, which is a dependency of the build module. -->
<logback.version>1.4.14</logback.version>
<aws-java-sdk.version>1.12.498</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.20.95</aws-java-sdk-v2.version>
<aws-java-sdk-v2.version>2.25.9</aws-java-sdk-v2.version>
<aws-crt.version>0.22.2</aws-crt.version>
<aws-lambda-java-events.version>3.11.2</aws-lambda-java-events.version>
<aws-lambda-java-core.version>1.2.2</aws-lambda-java-core.version>
Expand All @@ -123,7 +123,7 @@
<commons-text.version>1.10.0</commons-text.version>
<janino.version>3.1.12</janino.version>
<commons-net.version>3.9.0</commons-net.version>
<jackson.version>2.16.2</jackson.version>
<jackson.version>2.17.0</jackson.version>
<!-- Trino integration uses a different version of JJWT, this is the version used in the build module -->
<jjwt.build.version>0.12.5</jjwt.build.version>
<facebook.collections.version>0.1.32</facebook.collections.version>
Expand All @@ -136,7 +136,7 @@
<datasketches.version>3.3.0</datasketches.version>
<slf4j.version>2.0.12</slf4j.version>
<reload4j.version>1.2.24</reload4j.version>
<java-websocket.version>1.5.3</java-websocket.version>
<java-websocket.version>1.5.6</java-websocket.version>
<arrow.version>11.0.0</arrow.version>
<bouncycastle.version>1.75</bouncycastle.version>
<athena.version>2023.3.1</athena.version>
Expand All @@ -157,7 +157,7 @@
<junit.version>5.10.2</junit.version>
<junit.platform.version>1.10.1</junit.platform.version>
<mockito.version>4.11.0</mockito.version>
<testcontainers.version>1.19.0</testcontainers.version>
<testcontainers.version>1.19.7</testcontainers.version>
<wiremock.version>2.35.0</wiremock.version>
<assertj.version>3.24.1</assertj.version>
<approvaltests.version>22.3.3</approvaltests.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public SystemTestGarbageCollection invoke() {
}

public void waitFor() {
WaitForGC.waitUntilNoUnreferencedFiles(instance.getStateStore(),
WaitForGC.waitUntilNoUnreferencedFiles(instance,
PollWithRetries.intervalAndPollingTimeout(
Duration.ofSeconds(5), Duration.ofSeconds(30)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,51 @@
*/
package sleeper.systemtest.dsl.gc;

import sleeper.configuration.properties.table.TableProperties;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.util.PollWithRetries;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;

import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.configuration.properties.table.TableProperty.TABLE_ID;

public class WaitForGC {

private WaitForGC() {
}

public static void waitUntilNoUnreferencedFiles(StateStore stateStore, PollWithRetries poll) {
public static void waitUntilNoUnreferencedFiles(SystemTestInstanceContext instance, PollWithRetries poll) {
Map<String, TableProperties> tablesById = instance.streamTableProperties()
.collect(toMap(table -> table.get(TABLE_ID), table -> table));
try {
poll.pollUntil("no unreferenced files are present", () -> {
try {
return stateStore.getReadyForGCFilenamesBefore(Instant.now().plus(Duration.ofDays(1)))
.findAny().isEmpty();
} catch (StateStoreException e) {
throw new RuntimeException(e);
}
List<String> emptyTableIds = tablesById.values().stream()
.filter(table -> hasNoUnreferencedFiles(instance.getStateStore(table)))
.map(table -> table.get(TABLE_ID))
.collect(toUnmodifiableList());
emptyTableIds.forEach(tablesById::remove);
return tablesById.isEmpty();
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

private static boolean hasNoUnreferencedFiles(StateStore stateStore) {
try {
return stateStore.getReadyForGCFilenamesBefore(Instant.now().plus(Duration.ofDays(1)))
.findAny().isEmpty();
} catch (StateStoreException e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit f5b9b0a

Please sign in to comment.