Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to add OperationAttempt #1434

Merged
merged 27 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a22b355
Start refactoring CollectionSchemaObject
amorton Sep 13, 2024
04031fb
WIP refactoring to add OperationAttempt
amorton Sep 19, 2024
5fc7124
WIP - OperationAttempt with read path
amorton Sep 22, 2024
10a9581
WIP - added SchemaOperationAttempt
amorton Sep 23, 2024
d126132
Fix failing unit test shredderFailure()
amorton Sep 23, 2024
f207951
fmt:fmt
amorton Sep 23, 2024
b83a16f
WIP GeneralOperation and InsertAttemptPage
amorton Sep 25, 2024
81c3e09
Integration test fixes
amorton Sep 25, 2024
13b1008
unit test fixes
amorton Sep 25, 2024
a17d2f3
adding comments and code tidy
amorton Sep 26, 2024
f643396
trying to get the response for insertId's correct
amorton Sep 26, 2024
bcc3a9c
trying to get insertedId's correct again
amorton Sep 26, 2024
5cedeba
Test fixes
amorton Sep 26, 2024
5812835
fmt
amorton Sep 26, 2024
93a1369
comments and cleanup
amorton Sep 26, 2024
cf249c3
Merge branch 'main' into ajm/start-collection-refactor
amorton Sep 26, 2024
b347d40
Merge branch 'ajm/start-collection-refactor' into ajm/#1424-refactor-…
amorton Sep 26, 2024
71e20fb
Add CommandResultBuilder
amorton Sep 26, 2024
5cb12bd
Include errorID in response
amorton Sep 26, 2024
7ad4be0
remove log that should not be there
amorton Sep 26, 2024
260282b
move RequestContext into CommandQueryExecutor
amorton Sep 26, 2024
482b44a
comments and tidy
amorton Sep 26, 2024
b4e777a
fix: SchemaAttempt was calling wrong execute
amorton Sep 26, 2024
5b8bea7
compile fix
amorton Sep 26, 2024
57da595
Merge branch 'main' into ajm/#1424-refactor-TableInsertAttempt
amorton Sep 27, 2024
f3fbec9
Merge branch 'main' into ajm/#1424-refactor-TableInsertAttempt
tatu-at-datastax Sep 27, 2024
59e451b
fix failing InsertOneTableIntegrationTest
amorton Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package io.stargate.sgv2.jsonapi.service.cqldriver.executor;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.service.cqldriver.CQLSessionCache;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Configured to execute queries for a specific command that relies on drive profiles MORE TODO
* WORDS
*
* <p>The following settings should be set via the driver profile:
*
* <ul>
* <li><code>page-size</code>
* <li><code>consistency</code>
* <li><code>serial-consistency</code>
* <li><code>default-idempotence</code>
* </ul>
*/
public class CommandQueryExecutor {

private static final String PROFILE_NAME_SEPARATOR = "-";

public enum QueryTarget {
TABLE,
COLLECTION;

final String profilePrefix;

QueryTarget() {
this.profilePrefix = name().toLowerCase();
amorton marked this conversation as resolved.
Show resolved Hide resolved
}
}

private enum QueryType {
READ,
WRITE,
CREATE_SCHEMA;

final String profileSuffix;

QueryType() {
this.profileSuffix = name().toLowerCase();
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(CommandQueryExecutor.class);

private final CQLSessionCache cqlSessionCache;
private final RequestContext requestContext;
private final QueryTarget queryTarget;

public CommandQueryExecutor(
CQLSessionCache cqlSessionCache, RequestContext requestContext, QueryTarget queryTarget) {
this.cqlSessionCache =
Objects.requireNonNull(cqlSessionCache, "cqlSessionCache must not be null");
this.requestContext = Objects.requireNonNull(requestContext, "requestContext must not be null");
this.queryTarget = queryTarget;
}

public Uni<AsyncResultSet> executeRead(SimpleStatement statement) {
Objects.requireNonNull(statement, "statement must not be null");

statement = withExecutionProfile(statement, QueryType.READ);
return executeAndWrap(statement);
}

public Uni<AsyncResultSet> executeWrite(SimpleStatement statement) {
Objects.requireNonNull(statement, "statement must not be null");

statement = withExecutionProfile(statement, QueryType.WRITE);
return executeAndWrap(statement);
}

public Uni<AsyncResultSet> executeCreateSchema(SimpleStatement statement) {
Objects.requireNonNull(statement, "statement must not be null");

statement = withExecutionProfile(statement, QueryType.CREATE_SCHEMA);
return executeAndWrap(statement);
}

private CqlSession session() {
return cqlSessionCache.getSession(
requestContext.tenantId().orElse(""), requestContext.authToken().orElse(""));
}

private String getExecutionProfile(QueryType queryType) {
return queryTarget.profilePrefix + PROFILE_NAME_SEPARATOR + queryType.profileSuffix;
}

private SimpleStatement withExecutionProfile(SimpleStatement statement, QueryType queryType) {
return statement.setExecutionProfileName(getExecutionProfile(queryType));
}

private Uni<AsyncResultSet> executeAndWrap(SimpleStatement statement) {
return Uni.createFrom().completionStage(session().executeAsync(statement));
}

public static record RequestContext(Optional<String> tenantId, Optional<String> authToken) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.stargate.sgv2.jsonapi.service.cqldriver.executor;

import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Objects;
import java.util.Optional;

/**
* Holds the paging state, either coming from the user in a request or extracted from a result set.
*
* <p>And then can be used to add it to a statement or to get the string representation to put into
* a {@link io.stargate.sgv2.jsonapi.api.model.command.CommandResult}
*/
public class CqlPagingState {

// Public because some commands do not have any paging state so they can just get this
public static final CqlPagingState EMPTY = new CqlPagingState(true, null, null);

private final boolean isEmpty;
private final String pagingStateString;
private final ByteBuffer pagingStateBuffer;

private CqlPagingState(boolean isEmpty, String pagingStateString, ByteBuffer pagingStateBuffer) {
this.isEmpty = isEmpty;
this.pagingStateString = pagingStateString;
this.pagingStateBuffer = pagingStateBuffer;
}

public static CqlPagingState from(String pagingState) {
return switch (pagingState) {
case null -> EMPTY;
case "" -> EMPTY;
default -> new CqlPagingState(false, pagingState, decode(pagingState));
};
}

public static CqlPagingState from(AsyncResultSet rSet) {
return switch (rSet) {
case AsyncResultSet ars when ars.hasMorePages() -> {
var pagingBuffer = rSet.getExecutionInfo().getPagingState();
yield new CqlPagingState(false, encode(pagingBuffer), pagingBuffer);
}
case null, default -> EMPTY;
};
}

public boolean isEmpty() {
return isEmpty;
}

public SimpleStatement addToStatement(SimpleStatement statement) {
return isEmpty ? statement : statement.setPagingState(pagingStateBuffer);
}

public Optional<String> getPagingStateString() {
return isEmpty ? Optional.empty() : Optional.of(pagingStateString);
}

private static ByteBuffer decode(String pagingState) {
return ByteBuffer.wrap(Base64.getDecoder().decode(pagingState));
}

private static String encode(ByteBuffer pagingState) {
Objects.requireNonNull(pagingState, "pagingState must not be null");
return Base64.getEncoder().encodeToString(pagingState.array());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.stargate.sgv2.jsonapi.service.cqldriver.executor;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import static io.stargate.sgv2.jsonapi.util.CqlIdentifierUtil.cqlIdentifierFromUserInput;

import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
Expand Down Expand Up @@ -254,7 +255,7 @@ protected Uni<Optional<TableMetadata>> getSchema(
.getSession(dataApiRequestInfo)
.getMetadata()
.getKeyspaces()
.get(CqlIdentifier.fromInternal(namespace));
.get(cqlIdentifierFromUserInput(namespace));
} catch (Exception e) {
// TODO: this ^^ is a very wide error catch, confirm what it should actually be catching
return Uni.createFrom().failure(e);
Expand All @@ -267,7 +268,8 @@ protected Uni<Optional<TableMetadata>> getSchema(
// else get the table
// TODO: this should probably use CqlIdentifier.fromCql() (or .fromInternal())
// if we want to support case-sensitive names
return Uni.createFrom().item(keyspaceMetadata.getTable("\"" + collectionName + "\""));
return Uni.createFrom()
.item(keyspaceMetadata.getTable(cqlIdentifierFromUserInput(collectionName)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.stargate.sgv2.jsonapi.service.cqldriver.executor;

import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;

public abstract class TableBasedSchemaObject extends SchemaObject {

private final TableMetadata tableMetadata;

protected TableBasedSchemaObject(SchemaObjectType type, TableMetadata tableMetadata) {
// uses asCql(pretty) so the names do not always include double quotes
this(
type,
tableMetadata == null
? SchemaObjectName.MISSING
: new SchemaObjectName(
tableMetadata.getKeyspace().asCql(true), tableMetadata.getName().asCql(true)),
tableMetadata);
}

// aaron- adding this ctor so for now the CollectionSchemaObject can set the schemaObjectName and
// have the tablemetdata
// be null because it is not used by any collection processing (currently).
protected TableBasedSchemaObject(
SchemaObjectType type, SchemaObjectName name, TableMetadata tableMetadata) {
// uses asCql(pretty) so the names do not always include double quotes
super(type, name);
this.tableMetadata = tableMetadata;
}

public TableMetadata tableMetadata() {
return tableMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@

import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;

public class TableSchemaObject extends SchemaObject {
public class TableSchemaObject extends TableBasedSchemaObject {

public static final SchemaObjectType TYPE = SchemaObjectType.TABLE;

public final TableMetadata tableMetadata;

public TableSchemaObject(TableMetadata tableMetadata) {
// uses asCql(pretty) so the names do not always include double quotes
super(
TYPE,
new SchemaObjectName(
tableMetadata.getKeyspace().asCql(true), tableMetadata.getName().asCql(true)));
this.tableMetadata = tableMetadata;
super(TYPE, tableMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package io.stargate.sgv2.jsonapi.service.operation;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.request.DataApiRequestInfo;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.CommandQueryExecutor;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.DriverExceptionHandler;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.SchemaObject;
import java.util.Objects;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An {@link Operation} for running any type of {@link OperationAttempt} grouped into a {@link
* OperationAttemptContainer}.
*
* <p>
*
* @param <SchemaT> The type of the schema object that the operation is working with.
* @param <AttemptT> The type of the operation attempt that the operation is working with.
*/
public class GenericOperation<
SchemaT extends SchemaObject, AttemptT extends OperationAttempt<AttemptT, SchemaT>>
implements Operation {

private static final Logger LOGGER = LoggerFactory.getLogger(GenericOperation.class);

private final DriverExceptionHandler<SchemaT> driverExceptionHandler;
private final OperationAttemptContainer<SchemaT, AttemptT> attempts;
private final OperationAttemptPageBuilder<SchemaT, AttemptT> pageBuilder;

/**
* Create a new {@link GenericOperation} with the provided {@link OperationAttemptContainer},
*
* @param attempts The attempts to run, grouped into a container that has config about how to run
* them as a group.
* @param pageBuilder The builder to use for creating the {@link CommandResult} from the attempts.
* @param driverExceptionHandler The handler to use for exceptions thrown by the driver,
* exceptions thrown by the driver are passed through here before being added to the {@link
* OperationAttempt}.
*/
public GenericOperation(
OperationAttemptContainer<SchemaT, AttemptT> attempts,
OperationAttemptPageBuilder<SchemaT, AttemptT> pageBuilder,
DriverExceptionHandler<SchemaT> driverExceptionHandler) {

this.attempts = Objects.requireNonNull(attempts, "attempts cannot be null");
this.pageBuilder = Objects.requireNonNull(pageBuilder, "pageBuilder cannot be null");
this.driverExceptionHandler =
Objects.requireNonNull(driverExceptionHandler, "driverExceptionHandler cannot be null");
}

/**
* Execute the attempts passed to the operation using the provided {@link QueryExecutor}.
*
* <p>This is a generic operation that can be used to execute any type of attempts, the attempts
* are executed using the configuration of the supplied {@link OperationAttemptContainer} and the
* {@link OperationAttempt} itself. The results are grouped using supplied {@link
* OperationAttemptPageBuilder}, which created the {@link CommandResult}. Errors when executing
* the attempts are caught and attached to the {@link OperationAttempt} so the Page Builder can
* include them in the response.
*
* @param dataApiRequestInfo Request information used to get the tenantId and token
* @param queryExecutor The {@link QueryExecutor} to use for executing the attempts
* @return A supplier of {@link CommandResult} that represents the result of running all the
* attempts.
*/
@Override
public Uni<Supplier<CommandResult>> execute(
DataApiRequestInfo dataApiRequestInfo, QueryExecutor queryExecutor) {

LOGGER.debug("execute() starting to process attempts={}", attempts);

return startMulti(dataApiRequestInfo, queryExecutor)
.collect()
.in(() -> pageBuilder, OperationAttemptAccumulator::accumulate)
.onItem()
.invoke(() -> LOGGER.debug("execute() finished processing attempts={}", attempts))
.onItem()
.invoke(attempts::throwIfNotAllTerminal)
.onItem()
.transform(OperationAttemptPageBuilder::getOperationPage);
}

/**
* Start a {@link Multi} for processing the {@link #attempts}, the style of multi depends on the
* attempts configuration.
*
* @return a {@link Multi} that emits {@link AttemptT} according to the attempts configuration.
*/
protected Multi<AttemptT> startMulti(
DataApiRequestInfo dataApiRequestInfo, QueryExecutor queryExecutor) {

// TODO - for now we create the CommandQueryExecutor here , later change the Operation interface
var commandQueryExecutor =
new CommandQueryExecutor(
queryExecutor.getCqlSessionCache(),
new CommandQueryExecutor.RequestContext(
dataApiRequestInfo.getTenantId(), dataApiRequestInfo.getCassandraToken()),
CommandQueryExecutor.QueryTarget.TABLE);

// Common start pattern for all operations
var attemptMulti = Multi.createFrom().iterable(attempts).onItem();

if (attempts.getSequentialProcessing()) {
// We want to process the attempts sequentially, and stop processing the attempts if one fails
// This should not cause the multi to emit a failure, we track the failures in the attempts
// (transformToUniAndConcatenate is for sequential processing)

return attemptMulti.transformToUniAndConcatenate(
attempt -> {
var failFast = attempts.shouldFailFast();
LOGGER.debug(
"startMulti() - dequeuing attempt for sequential processing, failFast={}, attempt={}",
failFast,
attempt);

if (failFast) {
// Stop processing attemps, but we do not want to return a UniFailure, so we set the
// attempt to skipped
// and do not call exectute() on it.
return Uni.createFrom().item(attempt.setSkippedIfReady());
}
return attempt.execute(commandQueryExecutor, driverExceptionHandler);
});
}

// Parallel processing using transformToUniAndMerge() - no extra testing.
return attemptMulti.transformToUniAndMerge(
readAttempt -> readAttempt.execute(commandQueryExecutor, driverExceptionHandler));
}
}
Loading