Skip to content

Commit

Permalink
Merge branch '1352-s3-state-store-for-all-tables' into 1353-table-met…
Browse files Browse the repository at this point in the history
…rics-for-all-tables
  • Loading branch information
patchwork01 committed Sep 28, 2023
2 parents d70eae6 + 72a3960 commit e4fc448
Show file tree
Hide file tree
Showing 36 changed files with 353 additions and 98 deletions.
2 changes: 1 addition & 1 deletion NOTICES
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Apache Ivy (org.apache.ivy:ivy:2.5.2):

- Apache License, Version 2.0

Snappy Java (org.xerial.snappy:snappy-java:1.1.10.1):
Snappy Java (org.xerial.snappy:snappy-java:1.1.10.4):

- Apache License, Version 2.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.google.gson.Gson;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,7 +106,7 @@ public SleeperMetadataHandler(AmazonS3 s3Client, AmazonDynamoDB dynamoDBClient,
this.instanceProperties = new InstanceProperties();
this.instanceProperties.loadFromS3(s3Client, configBucket);
this.tablePropertiesProvider = new TablePropertiesProvider(s3Client, instanceProperties);
this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties);
this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, new Configuration());
}

public SleeperMetadataHandler(AmazonS3 s3Client,
Expand All @@ -121,7 +122,7 @@ public SleeperMetadataHandler(AmazonS3 s3Client,
this.instanceProperties = new InstanceProperties();
this.instanceProperties.loadFromS3(s3Client, configBucket);
this.tablePropertiesProvider = new TablePropertiesProvider(s3Client, instanceProperties);
this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties);
this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties, new Configuration());
}

/**
Expand Down
2 changes: 1 addition & 1 deletion java/athena/src/test/java/sleeper/athena/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static void ingestData(AmazonDynamoDB dynamoClient, AmazonS3 s3Client, St
IngestFactory factory = IngestFactory.builder()
.objectFactory(ObjectFactory.noUserJars())
.localDir(dataDir)
.stateStoreProvider(new StateStoreProvider(dynamoClient, instanceProperties))
.stateStoreProvider(new StateStoreProvider(dynamoClient, instanceProperties, new Configuration()))
.hadoopConfiguration(new Configuration())
.instanceProperties(instanceProperties)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest;
import com.amazonaws.services.securitytoken.model.GetCallerIdentityResult;
import com.google.gson.JsonSyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +38,7 @@
import sleeper.ingest.job.status.IngestJobStatusStore;
import sleeper.ingest.status.store.job.IngestJobStatusStoreFactory;
import sleeper.statestore.StateStoreProvider;
import sleeper.utils.HadoopConfigurationProvider;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -78,17 +80,17 @@ public BulkImportJobDriver(SessionRunner sessionRunner,
}

public static BulkImportJobDriver from(BulkImportJobRunner jobRunner, InstanceProperties instanceProperties,
AmazonS3 s3Client, AmazonDynamoDB dynamoClient) {
return from(jobRunner, instanceProperties, s3Client, dynamoClient,
AmazonS3 s3Client, AmazonDynamoDB dynamoClient, Configuration conf) {
return from(jobRunner, instanceProperties, s3Client, dynamoClient, conf,
IngestJobStatusStoreFactory.getStatusStore(dynamoClient, instanceProperties), Instant::now);
}

public static BulkImportJobDriver from(BulkImportJobRunner jobRunner, InstanceProperties instanceProperties,
AmazonS3 s3Client, AmazonDynamoDB dynamoClient,
AmazonS3 s3Client, AmazonDynamoDB dynamoClient, Configuration conf,
IngestJobStatusStore statusStore,
Supplier<Instant> getTime) {
TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3Client, instanceProperties);
StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoClient, instanceProperties);
StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoClient, instanceProperties, conf);
return new BulkImportJobDriver(new BulkImportSparkSessionRunner(
jobRunner, instanceProperties, tablePropertiesProvider, stateStoreProvider),
tablePropertiesProvider, stateStoreProvider, statusStore, getTime);
Expand Down Expand Up @@ -195,7 +197,8 @@ public static void start(String[] args, BulkImportJobRunner runner) throws Excep
}

BulkImportJobDriver driver = BulkImportJobDriver.from(runner, instanceProperties,
amazonS3, AmazonDynamoDBClientBuilder.defaultClient());
amazonS3, AmazonDynamoDBClientBuilder.defaultClient(),
HadoopConfigurationProvider.getConfigurationForEMR(instanceProperties));
driver.run(bulkImportJob, jobRunId, taskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public BulkImportJobOutput run(BulkImportJob job) throws IOException {
try {
allPartitions = stateStore.getAllPartitions();
} catch (StateStoreException e) {
LOGGER.error("Could not load partitions", e);
throw new RuntimeException("Failed to load statestore. Are permissions correct for this service account?");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import sleeper.core.statestore.FileInfo;

import java.time.Instant;

public class SparkFileInfoRow {

private SparkFileInfoRow() {
Expand All @@ -34,6 +36,7 @@ public static FileInfo createFileInfo(Row row) {
return FileInfo.builder()
.filename(row.getAs(FILENAME_FIELD_NAME))
.jobId(null)
.lastStateStoreUpdateTime(Instant.now())
.fileStatus(FileInfo.FileStatus.ACTIVE)
.partitionId(row.getAs(PARTITION_FIELD_NAME))
.numberOfRecords(row.getAs(NUM_RECORDS_FIELD_NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -59,8 +60,10 @@
import sleeper.ingest.job.status.WriteToMemoryIngestJobStatusStore;
import sleeper.io.parquet.record.ParquetRecordReader;
import sleeper.io.parquet.record.ParquetRecordWriterFactory;
import sleeper.statestore.StateStoreProvider;
import sleeper.statestore.dynamodb.DynamoDBStateStore;
import sleeper.statestore.dynamodb.DynamoDBStateStoreCreator;
import sleeper.statestore.s3.S3StateStore;
import sleeper.statestore.s3.S3StateStoreCreator;

import java.io.BufferedWriter;
import java.io.FileWriter;
Expand All @@ -84,6 +87,7 @@
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.DATA_BUCKET;
import static sleeper.configuration.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION;
import static sleeper.configuration.properties.table.TableProperty.STATESTORE_CLASSNAME;
import static sleeper.configuration.properties.table.TableProperty.TABLE_NAME;
import static sleeper.configuration.testutils.LocalStackAwsV1ClientHelper.buildAwsV1Client;
import static sleeper.core.record.process.RecordsProcessedSummaryTestData.summary;
Expand Down Expand Up @@ -166,11 +170,18 @@ public InstanceProperties createInstanceProperties(AmazonS3 s3Client, String dir

s3Client.createBucket(instanceProperties.get(CONFIG_BUCKET));
new DynamoDBStateStoreCreator(instanceProperties, dynamoDBClient).create();
new S3StateStoreCreator(instanceProperties, dynamoDBClient).create();

return instanceProperties;
}

public TableProperties createTable(InstanceProperties instanceProperties) {
TableProperties tableProperties = createTableProperties(instanceProperties);
tableProperties.saveToS3(s3Client);
return tableProperties;
}

public TableProperties createTableProperties(InstanceProperties instanceProperties) {
String tableName = UUID.randomUUID().toString();
TableProperties tableProperties = new TableProperties(instanceProperties);
tableProperties.set(TABLE_NAME, tableName);
Expand Down Expand Up @@ -270,8 +281,7 @@ private static void writeRecordsToFile(List<Record> records, String file) throws
}

private static StateStore initialiseStateStore(AmazonDynamoDB dynamoDBClient, InstanceProperties instanceProperties, TableProperties tableProperties, List<Object> splitPoints) throws StateStoreException {
StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties);
StateStore stateStore = stateStoreProvider.getStateStore(tableProperties);
StateStore stateStore = new DynamoDBStateStore(instanceProperties, tableProperties, dynamoDBClient);
stateStore.initialise(new PartitionsFromSplitPoints(getSchema(), splitPoints).construct());
return stateStore;
}
Expand All @@ -280,11 +290,17 @@ private static StateStore initialiseStateStore(AmazonDynamoDB dynamoDBClient, In
return initialiseStateStore(dynamoDBClient, instanceProperties, tableProperties, Collections.emptyList());
}

private static StateStore initialiseS3StateStore(AmazonDynamoDB dynamoDBClient, InstanceProperties instanceProperties, TableProperties tableProperties) throws StateStoreException {
StateStore stateStore = new S3StateStore(instanceProperties, tableProperties, dynamoDBClient, new Configuration());
stateStore.initialise();
return stateStore;
}

private void runJob(BulkImportJobRunner runner, InstanceProperties properties, BulkImportJob job) throws IOException {
String jobRunId = "test-run";
statusStore.jobValidated(ingestJobAccepted(job.toIngestJob(), validationTime).jobRunId(jobRunId).build());
BulkImportJobDriver driver = BulkImportJobDriver.from(runner, properties,
s3Client, dynamoDBClient, statusStore,
s3Client, dynamoDBClient, new Configuration(), statusStore,
List.of(startTime, endTime).iterator()::next);
driver.run(job, jobRunId, taskId);
}
Expand Down Expand Up @@ -530,4 +546,54 @@ void shouldNotThrowExceptionIfProvidedWithDirectoryWhichContainsParquetAndNonPar
finishedIngestJobWithValidation(job.toIngestJob(), taskId, validationTime,
summary(startTime, endTime, records.size(), records.size())));
}

@ParameterizedTest
@MethodSource("getParameters")
void shouldImportDataWithS3StateStore(BulkImportJobRunner runner) throws IOException, StateStoreException {
// Given
// - Instance and table properties
String dataDir = folder.toString();
InstanceProperties instanceProperties = createInstanceProperties(s3Client, dataDir);
TableProperties tableProperties = createTableProperties(instanceProperties);
tableProperties.set(STATESTORE_CLASSNAME, S3StateStore.class.getName());
tableProperties.saveToS3(s3Client);
// - Write some data to be imported
List<Record> records = getRecords();
writeRecordsToFile(records, dataDir + "/import/a.parquet");
List<String> inputFiles = new ArrayList<>();
inputFiles.add(dataDir + "/import/a.parquet");
// - State store
StateStore stateStore = initialiseS3StateStore(dynamoDBClient, instanceProperties, tableProperties);

// When
BulkImportJob job = BulkImportJob.builder().id("my-job").files(inputFiles)
.tableName(tableProperties.get(TABLE_NAME)).build();
runJob(runner, instanceProperties, job);

// Then
List<FileInfo> activeFiles = stateStore.getActiveFiles();
List<Record> readRecords = new ArrayList<>();
for (FileInfo fileInfo : activeFiles) {
try (ParquetRecordReader reader = new ParquetRecordReader(new Path(fileInfo.getFilename()), schema)) {
List<Record> recordsInThisFile = new ArrayList<>();
Record record = reader.read();
while (null != record) {
Record clonedRecord = new Record(record);
readRecords.add(clonedRecord);
recordsInThisFile.add(clonedRecord);
record = reader.read();
}
assertThat(recordsInThisFile).isSortedAccordingTo(new RecordComparator(getSchema()));
}
}
assertThat(readRecords).hasSameSizeAs(records);

List<Record> expectedRecords = new ArrayList<>(records);
sortRecords(expectedRecords);
sortRecords(readRecords);
assertThat(readRecords).isEqualTo(expectedRecords);
assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_NAME))).containsExactly(
finishedIngestJobWithValidation(job.toIngestJob(), taskId, validationTime,
summary(startTime, endTime, records.size(), records.size())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import sleeper.ingest.job.status.IngestJobStatusStore;
import sleeper.ingest.status.store.job.IngestJobStatusStoreFactory;
import sleeper.statestore.StateStoreProvider;
import sleeper.utils.HadoopConfigurationProvider;
import sleeper.utils.HadoopPathUtils;

import java.time.Instant;
Expand Down Expand Up @@ -69,10 +70,10 @@ public BulkImportStarterLambda() {
TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3, instanceProperties);
PlatformExecutor platformExecutor = PlatformExecutor.fromEnvironment(
instanceProperties, tablePropertiesProvider);
hadoopConfig = new Configuration();
hadoopConfig = HadoopConfigurationProvider.getConfigurationForLambdas(instanceProperties);
ingestJobStatusStore = IngestJobStatusStoreFactory.getStatusStore(dynamo, instanceProperties);
executor = new BulkImportExecutor(instanceProperties, tablePropertiesProvider,
new StateStoreProvider(dynamo, instanceProperties),
new StateStoreProvider(dynamo, instanceProperties, hadoopConfig),
ingestJobStatusStore, s3, platformExecutor, Instant::now);
invalidJobIdSupplier = () -> UUID.randomUUID().toString();
timeSupplier = Instant::now;
Expand Down
1 change: 1 addition & 0 deletions java/cdk/src/main/java/sleeper/cdk/stack/TableStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public TableStack(
.runtime(Runtime.JAVA_11));

configBucket.grantReadWrite(sleeperTableLambda);
dataStack.getDataBucket().grantReadWrite(sleeperTableLambda); // S3 state store stores its state in data bucket

Provider sleeperTableProvider = Provider.Builder.create(this, "SleeperTableProvider")
.onEventHandler(sleeperTableLambda)
Expand Down
6 changes: 3 additions & 3 deletions java/clients/src/main/java/sleeper/clients/QueryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public class QueryClient extends QueryCommandLineClient {
private final ExecutorService executorService;
private final Map<String, QueryExecutor> cachedQueryExecutors = new HashMap<>();

public QueryClient(AmazonS3 s3Client, InstanceProperties instanceProperties, AmazonDynamoDB dynamoClient) throws ObjectFactoryException {
public QueryClient(AmazonS3 s3Client, InstanceProperties instanceProperties, AmazonDynamoDB dynamoClient, Configuration conf) throws ObjectFactoryException {
super(s3Client, instanceProperties);
this.objectFactory = new ObjectFactory(instanceProperties, s3Client, "/tmp");
this.stateStoreProvider = new StateStoreProvider(dynamoClient, instanceProperties);
this.stateStoreProvider = new StateStoreProvider(dynamoClient, instanceProperties, conf);
this.executorService = Executors.newFixedThreadPool(30);
}

Expand Down Expand Up @@ -121,7 +121,7 @@ public static void main(String[] args) throws StateStoreException, ObjectFactory
AmazonDynamoDB dynamoDB = buildAwsV1Client(AmazonDynamoDBClientBuilder.standard());
InstanceProperties instanceProperties = ClientUtils.getInstanceProperties(amazonS3, args[0]);

QueryClient queryClient = new QueryClient(amazonS3, instanceProperties, dynamoDB);
QueryClient queryClient = new QueryClient(amazonS3, instanceProperties, dynamoDB, new Configuration());
queryClient.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private void ingestRecords(InstanceProperties instanceProperties, TablePropertie
.objectFactory(ObjectFactory.noUserJars())
.localDir(tempDir.toString())
.hadoopConfiguration(getHadoopConfiguration())
.stateStoreProvider(new StateStoreProvider(dynamoDB, instanceProperties))
.stateStoreProvider(new StateStoreProvider(dynamoDB, instanceProperties, null))
.s3AsyncClient(createS3AsyncClient())
.build().ingestFromRecordIteratorAndClose(tableProperties, new WrappedIterator<>(records.iterator()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class DockerInstanceTestBase {

public CloseableIterator<Record> queryAllRecords(
InstanceProperties instanceProperties, TableProperties tableProperties) throws Exception {
StateStore stateStore = new StateStoreProvider(dynamoDB, instanceProperties).getStateStore(tableProperties);
StateStore stateStore = new StateStoreProvider(dynamoDB, instanceProperties, null).getStateStore(tableProperties);
PartitionTree tree = new PartitionTree(tableProperties.getSchema(), stateStore.getAllPartitions());
QueryExecutor executor = new QueryExecutor(ObjectFactory.noUserJars(), tableProperties,
stateStore, getHadoopConfiguration(), Executors.newSingleThreadExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public CreateJobsLambda(InstanceProperties instanceProperties,
.build();
this.tablePropertiesProvider = new TablePropertiesProvider(s3Client, instanceProperties);
this.propertiesReloader = PropertiesReloader.ifConfigured(s3Client, instanceProperties, tablePropertiesProvider);
this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties);
this.stateStoreProvider = new StateStoreProvider(dynamoDBClient, instanceProperties,
HadoopConfigurationProvider.getConfigurationForLambdas(instanceProperties));
this.tableLister = new TableLister(s3Client, instanceProperties);
this.jobStatusStore = CompactionJobStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void setUp() throws Exception {
AmazonDynamoDB dynamoDB = createDynamoClient();
TableProperties tableProperties = createTable(s3, dynamoDB, instanceProperties, schema);
TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3, instanceProperties);
StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties);
StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties, null);
stateStore = stateStoreProvider.getStateStore(tableProperties);
stateStore.initialise();
DynamoDBCompactionJobStatusStoreCreator.create(instanceProperties, dynamoDB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void shouldDeleteMessages() throws Exception {
String tableName = UUID.randomUUID().toString();
InstanceProperties instanceProperties = createProperties(s3);
TableProperties tableProperties = createTable(s3, dynamoDB, instanceProperties, tableName, schema);
StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties);
StateStoreProvider stateStoreProvider = new StateStoreProvider(dynamoDB, instanceProperties, null);
TablePropertiesProvider tablePropertiesProvider = new TablePropertiesProvider(s3, instanceProperties);
StateStore stateStore = stateStoreProvider.getStateStore(tableProperties);
stateStore.initialise();
Expand Down
Loading

0 comments on commit e4fc448

Please sign in to comment.