Skip to content

Commit

Permalink
Simplify DistSQLUpdateExecutor (#29890)
Browse files Browse the repository at this point in the history
* Refactor DistSQLUpdateExecuteEngine

* Refactor AlterStorageUnitExecutor

* Simplify DistSQLUpdateExecutor

* Fix test cases
  • Loading branch information
terrymanu authored Jan 28, 2024
1 parent 53f9f49 commit 678dac7
Show file tree
Hide file tree
Showing 21 changed files with 156 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,17 @@ public void executeQuery() throws SQLException {

@SuppressWarnings({"rawtypes", "unchecked"})
private void setRule(final DistSQLExecutorRuleAware executor) {
Optional<ShardingSphereRule> globalRule = contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(executor.getRuleClass());
if (globalRule.isPresent()) {
executor.setRule(globalRule.get());
return;
Optional<ShardingSphereRule> rule = findRule(executor.getRuleClass());
if (rule.isPresent()) {
executor.setRule(rule.get());
} else {
rows = Collections.emptyList();
}
ShardingSphereDatabase database = getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, currentDatabaseName));
Optional<ShardingSphereRule> databaseRule = database.getRuleMetaData().findSingleRule(executor.getRuleClass());
if (databaseRule.isPresent()) {
executor.setRule(databaseRule.get());
return;
}
rows = Collections.emptyList();
}

private Optional<ShardingSphereRule> findRule(final Class<ShardingSphereRule> ruleClass) {
Optional<ShardingSphereRule> globalRule = contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(ruleClass);
return globalRule.isPresent() ? globalRule : getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, currentDatabaseName)).getRuleMetaData().findSingleRule(ruleClass);
}

protected abstract ShardingSphereDatabase getDatabase(String databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,22 @@ public void executeUpdate() throws SQLException {
executor.executeUpdate(sqlStatement, contextManager);
}

@SuppressWarnings({"unchecked", "rawtypes"})
private void checkBeforeUpdate(final DistSQLUpdateExecutor executor) {
private void checkBeforeUpdate(final DistSQLUpdateExecutor<?> executor) {
if (null != executor.getClass().getAnnotation(DistSQLExecutorClusterModeRequired.class)) {
ShardingSpherePreconditions.checkState(contextManager.getInstanceContext().isCluster(), () -> new UnsupportedSQLOperationException("Mode must be `Cluster`."));
}
executor.checkBeforeUpdate(sqlStatement, contextManager);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private void setRule(final DistSQLExecutorRuleAware executor) {
Optional<ShardingSphereRule> globalRule = contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(executor.getRuleClass());
if (globalRule.isPresent()) {
executor.setRule(globalRule.get());
return;
}
ShardingSphereDatabase database = getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, currentDatabaseName));
Optional<ShardingSphereRule> databaseRule = database.getRuleMetaData().findSingleRule(executor.getRuleClass());
if (databaseRule.isPresent()) {
executor.setRule(databaseRule.get());
return;
}
throw new UnsupportedSQLOperationException(String.format("The current database has no `%s` rules", executor.getRuleClass()));
private void setRule(final DistSQLExecutorRuleAware executor) throws UnsupportedSQLOperationException {
Optional<ShardingSphereRule> rule = findRule(executor.getRuleClass());
ShardingSpherePreconditions.checkState(rule.isPresent(), () -> new UnsupportedSQLOperationException(String.format("The current database has no `%s` rules", executor.getRuleClass())));
executor.setRule(rule.get());
}

private Optional<ShardingSphereRule> findRule(final Class<ShardingSphereRule> ruleClass) {
Optional<ShardingSphereRule> globalRule = contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(ruleClass);
return globalRule.isPresent() ? globalRule : getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, currentDatabaseName)).getRuleMetaData().findSingleRule(ruleClass);
}

protected abstract ShardingSphereDatabase getDatabase(String databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@
@SingletonSPI
public interface DistSQLUpdateExecutor<T extends DistSQLStatement> extends TypedSPI {

/**
* Check before update.
*
* @param sqlStatement SQL statement
* @param contextManager context manager
*/
default void checkBeforeUpdate(T sqlStatement, ContextManager contextManager) {
}

/**
* Execute update.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ public final class MigrateTableExecutor implements DistSQLUpdateExecutor<Migrate

private ShardingSphereDatabase database;

@Override
public void checkBeforeUpdate(final MigrateTableStatement sqlStatement, final ContextManager contextManager) {
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? database.getName() : sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName, MissingRequiredTargetDatabaseException::new);
}

@Override
public void executeUpdate(final MigrateTableStatement sqlStatement, final ContextManager contextManager) {
checkTargetDatabase(sqlStatement);
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? database.getName() : sqlStatement.getTargetDatabaseName();
MigrationJobAPI jobAPI = (MigrationJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
jobAPI.start(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
}

private void checkTargetDatabase(final MigrateTableStatement sqlStatement) {
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? database.getName() : sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName, MissingRequiredTargetDatabaseException::new);
}

@Override
public Class<MigrateTableStatement> getType() {
return MigrateTableStatement.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,9 @@ public final class RegisterMigrationSourceStorageUnitExecutor implements DistSQL

private final DataSourcePoolPropertiesValidator validateHandler = new DataSourcePoolPropertiesValidator();

@Override
public void checkBeforeUpdate(final RegisterMigrationSourceStorageUnitStatement sqlStatement, final ContextManager contextManager) {
ShardingSpherePreconditions.checkState(sqlStatement.getDataSources().stream().noneMatch(HostnameAndPortBasedDataSourceSegment.class::isInstance),
() -> new UnsupportedSQLOperationException("Not currently support add hostname and port, please use url"));
}

@Override
public void executeUpdate(final RegisterMigrationSourceStorageUnitStatement sqlStatement, final ContextManager contextManager) {
checkDataSource(sqlStatement);
List<DataSourceSegment> dataSources = new ArrayList<>(sqlStatement.getDataSources());
URLBasedDataSourceSegment urlBasedDataSourceSegment = (URLBasedDataSourceSegment) dataSources.get(0);
DatabaseType databaseType = DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Expand All @@ -65,6 +60,11 @@ public void executeUpdate(final RegisterMigrationSourceStorageUnitStatement sqlS
jobAPI.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap);
}

private void checkDataSource(final RegisterMigrationSourceStorageUnitStatement sqlStatement) {
ShardingSpherePreconditions.checkState(sqlStatement.getDataSources().stream().noneMatch(HostnameAndPortBasedDataSourceSegment.class::isInstance),
() -> new UnsupportedSQLOperationException("Not currently support add hostname and port, please use url"));
}

@Override
public Class<RegisterMigrationSourceStorageUnitStatement> getType() {
return RegisterMigrationSourceStorageUnitStatement.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,11 @@
@DistSQLExecutorClusterModeRequired
public final class LockClusterExecutor implements DistSQLUpdateExecutor<LockClusterStatement> {

@Override
public void checkBeforeUpdate(final LockClusterStatement sqlStatement, final ContextManager contextManager) {
checkState(contextManager);
checkAlgorithm(sqlStatement);
}

private void checkState(final ContextManager contextManager) {
ClusterState currentState = contextManager.getClusterStateContext().getCurrentState();
ShardingSpherePreconditions.checkState(ClusterState.OK == currentState, () -> new IllegalStateException("Cluster is already locked"));
}

private void checkAlgorithm(final LockClusterStatement sqlStatement) {
ShardingSpherePreconditions.checkState(isStrategyDefinitionExists(sqlStatement), MissingRequiredAlgorithmException::new);
TypedSPILoader.checkService(ClusterLockStrategy.class, sqlStatement.getLockStrategy().getName(), sqlStatement.getLockStrategy().getProps());
}

private boolean isStrategyDefinitionExists(final LockClusterStatement sqlStatement) {
return null != sqlStatement.getLockStrategy();
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void executeUpdate(final LockClusterStatement sqlStatement, final ContextManager contextManager) {
checkState(contextManager);
checkAlgorithm(sqlStatement);
LockContext lockContext = contextManager.getInstanceContext().getLockContext();
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockNames.CLUSTER_LOCK.getLockName());
if (lockContext.tryLock(lockDefinition, 3000L)) {
Expand All @@ -71,6 +53,16 @@ public void executeUpdate(final LockClusterStatement sqlStatement, final Context
}
}

private void checkState(final ContextManager contextManager) {
ClusterState currentState = contextManager.getClusterStateContext().getCurrentState();
ShardingSpherePreconditions.checkState(ClusterState.OK == currentState, () -> new IllegalStateException("Cluster is already locked"));
}

private void checkAlgorithm(final LockClusterStatement sqlStatement) {
ShardingSpherePreconditions.checkNotNull(sqlStatement.getLockStrategy(), MissingRequiredAlgorithmException::new);
TypedSPILoader.checkService(ClusterLockStrategy.class, sqlStatement.getLockStrategy().getName(), sqlStatement.getLockStrategy().getProps());
}

@Override
public Class<LockClusterStatement> getType() {
return LockClusterStatement.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,9 @@ public final class RefreshTableMetaDataExecutor implements DistSQLUpdateExecutor

private ShardingSphereDatabase database;

@Override
public void checkBeforeUpdate(final RefreshTableMetaDataStatement sqlStatement, final ContextManager contextManager) {
checkStorageUnit(contextManager.getStorageUnits(database.getName()), sqlStatement);
}

private void checkStorageUnit(final Map<String, StorageUnit> storageUnits, final RefreshTableMetaDataStatement sqlStatement) {
ShardingSpherePreconditions.checkState(!storageUnits.isEmpty(), () -> new EmptyStorageUnitException(database.getName()));
if (sqlStatement.getStorageUnitName().isPresent()) {
String storageUnitName = sqlStatement.getStorageUnitName().get();
ShardingSpherePreconditions.checkState(
storageUnits.containsKey(storageUnitName), () -> new MissingRequiredStorageUnitsException(database.getName(), Collections.singleton(storageUnitName)));
}
}

@Override
public void executeUpdate(final RefreshTableMetaDataStatement sqlStatement, final ContextManager contextManager) throws SQLException {
checkStorageUnit(contextManager.getStorageUnits(database.getName()), sqlStatement);
String schemaName = getSchemaName(sqlStatement);
if (sqlStatement.getStorageUnitName().isPresent()) {
if (sqlStatement.getTableName().isPresent()) {
Expand All @@ -73,6 +60,15 @@ public void executeUpdate(final RefreshTableMetaDataStatement sqlStatement, fina
}
}

private void checkStorageUnit(final Map<String, StorageUnit> storageUnits, final RefreshTableMetaDataStatement sqlStatement) {
ShardingSpherePreconditions.checkState(!storageUnits.isEmpty(), () -> new EmptyStorageUnitException(database.getName()));
if (sqlStatement.getStorageUnitName().isPresent()) {
String storageUnitName = sqlStatement.getStorageUnitName().get();
ShardingSpherePreconditions.checkState(
storageUnits.containsKey(storageUnitName), () -> new MissingRequiredStorageUnitsException(database.getName(), Collections.singleton(storageUnitName)));
}
}

private String getSchemaName(final RefreshTableMetaDataStatement sqlStatement) {
return sqlStatement.getSchemaName().isPresent() ? sqlStatement.getSchemaName().get() : new DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(database.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,9 @@
*/
public final class SetDistVariableExecutor implements DistSQLUpdateExecutor<SetDistVariableStatement> {

@Override
public void checkBeforeUpdate(final SetDistVariableStatement sqlStatement, final ContextManager contextManager) {
ShardingSpherePreconditions.checkState(getEnumType(sqlStatement.getName()) instanceof TypedPropertyKey, () -> new UnsupportedVariableException(sqlStatement.getName()));
}

@Override
public void executeUpdate(final SetDistVariableStatement sqlStatement, final ContextManager contextManager) throws SQLException {
ShardingSpherePreconditions.checkState(getEnumType(sqlStatement.getName()) instanceof TypedPropertyKey, () -> new UnsupportedVariableException(sqlStatement.getName()));
handleConfigurationProperty(contextManager, (TypedPropertyKey) getEnumType(sqlStatement.getName()), sqlStatement.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
public final class SetInstanceStatusExecutor implements DistSQLUpdateExecutor<SetInstanceStatusStatement> {

@Override
public void checkBeforeUpdate(final SetInstanceStatusStatement sqlStatement, final ContextManager contextManager) {
public void executeUpdate(final SetInstanceStatusStatement sqlStatement, final ContextManager contextManager) {
if ("DISABLE".equals(sqlStatement.getStatus())) {
checkDisablingIsValid(contextManager, sqlStatement.getInstanceId());
} else {
checkEnablingIsValid(contextManager, sqlStatement.getInstanceId());
}
contextManager.getInstanceContext().getEventBusContext().post(
new ComputeNodeStatusChangedEvent(sqlStatement.getInstanceId(), "DISABLE".equals(sqlStatement.getStatus()) ? InstanceState.CIRCUIT_BREAK : InstanceState.OK));
}

private void checkEnablingIsValid(final ContextManager contextManager, final String instanceId) {
Expand All @@ -55,12 +57,6 @@ private void checkDisablingIsValid(final ContextManager contextManager, final St
() -> new UnsupportedSQLOperationException(String.format("`%s` compute node has been disabled", instanceId)));
}

@Override
public void executeUpdate(final SetInstanceStatusStatement sqlStatement, final ContextManager contextManager) {
contextManager.getInstanceContext().getEventBusContext().post(
new ComputeNodeStatusChangedEvent(sqlStatement.getInstanceId(), "DISABLE".equals(sqlStatement.getStatus()) ? InstanceState.CIRCUIT_BREAK : InstanceState.OK));
}

@Override
public Class<SetInstanceStatusStatement> getType() {
return SetInstanceStatusStatement.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,10 @@
@DistSQLExecutorClusterModeRequired
public final class UnlockClusterExecutor implements DistSQLUpdateExecutor<UnlockClusterStatement> {

@Override
public void checkBeforeUpdate(final UnlockClusterStatement sqlStatement, final ContextManager contextManager) {
checkState(contextManager);
}

private void checkState(final ContextManager contextManager) {
ClusterState currentState = contextManager.getClusterStateContext().getCurrentState();
ShardingSpherePreconditions.checkState(ClusterState.OK != currentState, () -> new IllegalStateException("Cluster is not locked"));
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void executeUpdate(final UnlockClusterStatement sqlStatement, final ContextManager contextManager) {

checkState(contextManager);
LockContext lockContext = contextManager.getInstanceContext().getLockContext();
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockNames.CLUSTER_LOCK.getLockName());
if (lockContext.tryLock(lockDefinition, 3000L)) {
Expand All @@ -61,6 +51,10 @@ public void executeUpdate(final UnlockClusterStatement sqlStatement, final Conte
}
}

private void checkState(final ContextManager contextManager) {
ShardingSpherePreconditions.checkState(ClusterState.OK != contextManager.getClusterStateContext().getCurrentState(), () -> new IllegalStateException("Cluster is not locked"));
}

@Override
public Class<UnlockClusterStatement> getType() {
return UnlockClusterStatement.class;
Expand Down
Loading

0 comments on commit 678dac7

Please sign in to comment.