Skip to content

Commit

Permalink
Add ResourceDefinitionBackendHandler (#29798)
Browse files Browse the repository at this point in the history
* Refactor RDLBackendHandlerFactory

* Add ResourceDefinitionBackendHandler

* Add ResourceDefinitionBackendHandler

* Add ResourceDefinitionBackendHandler
  • Loading branch information
terrymanu authored Jan 21, 2024
1 parent fd6dd1f commit 4889b3e
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 246 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.distsql.handler.type.rdl;

import org.apache.shardingsphere.distsql.statement.rdl.RDLStatement;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

/**
* RDL executor.
*
* @param <T> type of RQL statement
*/
@SingletonSPI
public interface RDLExecutor<T extends RDLStatement> extends TypedSPI {

/**
* Execute update.
*
* @param sqlStatement SQL statement
*/
void execute(T sqlStatement);

@Override
Class<T> getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.storage.unit;
package org.apache.shardingsphere.distsql.handler.type.rdl.aware;

import org.apache.shardingsphere.distsql.statement.rdl.StorageUnitDefinitionStatement;
import org.apache.shardingsphere.distsql.handler.type.rdl.RDLExecutor;
import org.apache.shardingsphere.distsql.statement.rdl.RDLStatement;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.RDLBackendHandler;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;

/**
* Storage unit definition backend handler.
*
* @param <T> type of storage unit definition statement
* Database aware RDL executor.
*
* @param <T> type of SQL statement
*/
public abstract class StorageUnitDefinitionBackendHandler<T extends StorageUnitDefinitionStatement> extends RDLBackendHandler<T> {

protected StorageUnitDefinitionBackendHandler(final T sqlStatement, final ConnectionSession connectionSession) {
super(sqlStatement, connectionSession);
}
public interface DatabaseAwareRDLExecutor<T extends RDLStatement> extends RDLExecutor<T> {

protected abstract void checkSQLStatement(ShardingSphereDatabase database, T sqlStatement);
/**
* Set database.
*
* @param database database
*/
void setDatabase(ShardingSphereDatabase database);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.distsql.handler.type.rdl.RDLExecutor;
import org.apache.shardingsphere.distsql.handler.type.rdl.aware.DatabaseAwareRDLExecutor;
import org.apache.shardingsphere.distsql.statement.rdl.RDLStatement;
import org.apache.shardingsphere.distsql.statement.rdl.RuleDefinitionStatement;
import org.apache.shardingsphere.distsql.statement.rdl.StorageUnitDefinitionStatement;
import org.apache.shardingsphere.distsql.statement.rdl.alter.AlterStorageUnitStatement;
import org.apache.shardingsphere.distsql.statement.rdl.create.RegisterStorageUnitStatement;
import org.apache.shardingsphere.distsql.statement.rdl.drop.UnregisterStorageUnitStatement;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.resource.ResourceDefinitionBackendHandler;
import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.rule.NewRuleDefinitionBackendHandler;
import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.rule.RuleDefinitionBackendHandler;
import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.storage.unit.AlterStorageUnitBackendHandler;
import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.storage.unit.RegisterStorageUnitBackendHandler;
import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.storage.unit.UnregisterStorageUnitBackendHandler;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.util.DatabaseNameUtils;

/**
* RDL backend handler factory.
Expand All @@ -59,13 +58,12 @@ public static ProxyBackendHandler newInstance(final RDLStatement sqlStatement, f
return new RuleDefinitionBackendHandler<>((RuleDefinitionStatement) sqlStatement, connectionSession);
}

private static ProxyBackendHandler getStorageUnitBackendHandler(final StorageUnitDefinitionStatement sqlStatement, final ConnectionSession connectionSession) {
if (sqlStatement instanceof RegisterStorageUnitStatement) {
return new RegisterStorageUnitBackendHandler((RegisterStorageUnitStatement) sqlStatement, connectionSession);
@SuppressWarnings("rawtypes")
private static ResourceDefinitionBackendHandler getStorageUnitBackendHandler(final StorageUnitDefinitionStatement sqlStatement, final ConnectionSession connectionSession) {
RDLExecutor executor = TypedSPILoader.getService(RDLExecutor.class, sqlStatement.getClass());
if (executor instanceof DatabaseAwareRDLExecutor) {
((DatabaseAwareRDLExecutor<?>) executor).setDatabase(ProxyContext.getInstance().getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, connectionSession)));
}
if (sqlStatement instanceof AlterStorageUnitStatement) {
return new AlterStorageUnitBackendHandler((AlterStorageUnitStatement) sqlStatement, connectionSession);
}
return new UnregisterStorageUnitBackendHandler((UnregisterStorageUnitStatement) sqlStatement, connectionSession);
return new ResourceDefinitionBackendHandler(sqlStatement, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.storage.unit;
package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.resource;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.DuplicateStorageUnitException;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.InvalidStorageUnitsException;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.MissingRequiredStorageUnitsException;
import org.apache.shardingsphere.distsql.handler.type.rdl.aware.DatabaseAwareRDLExecutor;
import org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler;
import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
import org.apache.shardingsphere.distsql.segment.HostnameAndPortBasedDataSourceSegment;
Expand All @@ -30,16 +32,12 @@
import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrl;
import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.core.external.ShardingSphereExternalException;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;

import java.sql.SQLException;
import java.util.Collection;
Expand All @@ -53,38 +51,31 @@
* Alter storage unit backend handler.
*/
@Slf4j
public final class AlterStorageUnitBackendHandler extends StorageUnitDefinitionBackendHandler<AlterStorageUnitStatement> {
@Setter
public final class AlterStorageUnitExecutor implements DatabaseAwareRDLExecutor<AlterStorageUnitStatement> {

private final DatabaseType databaseType;
private final DataSourcePoolPropertiesValidateHandler validateHandler = new DataSourcePoolPropertiesValidateHandler();

private final DataSourcePoolPropertiesValidateHandler validateHandler;

public AlterStorageUnitBackendHandler(final AlterStorageUnitStatement sqlStatement, final ConnectionSession connectionSession) {
super(sqlStatement, connectionSession);
databaseType = connectionSession.getProtocolType();
validateHandler = new DataSourcePoolPropertiesValidateHandler();
}
private ShardingSphereDatabase database;

@Override
public ResponseHeader execute(final ShardingSphereDatabase database, final AlterStorageUnitStatement sqlStatement) {
checkSQLStatement(database, sqlStatement);
Map<String, DataSourcePoolProperties> propsMap = DataSourceSegmentsConverter.convert(databaseType, sqlStatement.getStorageUnits());
public void execute(final AlterStorageUnitStatement sqlStatement) {
checkSQLStatement(sqlStatement);
Map<String, DataSourcePoolProperties> propsMap = DataSourceSegmentsConverter.convert(database.getProtocolType(), sqlStatement.getStorageUnits());
validateHandler.validate(propsMap);
try {
ProxyContext.getInstance().getContextManager().getInstanceContext().getModeContextManager().alterStorageUnits(database.getName(), propsMap);
} catch (final SQLException | ShardingSphereExternalException ex) {
log.error("Alter storage unit failed", ex);
throw new InvalidStorageUnitsException(Collections.singleton(ex.getMessage()));
}
return new UpdateResponseHeader(sqlStatement);
}

@Override
public void checkSQLStatement(final ShardingSphereDatabase database, final AlterStorageUnitStatement sqlStatement) {
private void checkSQLStatement(final AlterStorageUnitStatement sqlStatement) {
Collection<String> toBeAlteredStorageUnitNames = getToBeAlteredStorageUnitNames(sqlStatement);
checkDuplicatedStorageUnitNames(toBeAlteredStorageUnitNames);
checkStorageUnitNameExisted(database, toBeAlteredStorageUnitNames);
checkDatabase(database, sqlStatement);
checkStorageUnitNameExisted(toBeAlteredStorageUnitNames);
checkDatabase(sqlStatement);
}

private Collection<String> getToBeAlteredStorageUnitNames(final AlterStorageUnitStatement sqlStatement) {
Expand All @@ -100,30 +91,29 @@ private Collection<String> getDuplicatedStorageUnitNames(final Collection<String
return storageUnitNames.stream().filter(each -> storageUnitNames.stream().filter(each::equals).count() > 1).collect(Collectors.toList());
}

private void checkStorageUnitNameExisted(final ShardingSphereDatabase database, final Collection<String> storageUnitNames) {
private void checkStorageUnitNameExisted(final Collection<String> storageUnitNames) {
Map<String, StorageUnit> storageUnits = database.getResourceMetaData().getStorageUnits();
Collection<String> notExistedStorageUnitNames = storageUnitNames.stream().filter(each -> !storageUnits.containsKey(each)).collect(Collectors.toList());
ShardingSpherePreconditions.checkState(notExistedStorageUnitNames.isEmpty(), () -> new MissingRequiredStorageUnitsException(database.getName(), notExistedStorageUnitNames));
}

private void checkDatabase(final ShardingSphereDatabase database, final AlterStorageUnitStatement sqlStatement) {
private void checkDatabase(final AlterStorageUnitStatement sqlStatement) {
Map<String, StorageUnit> storageUnits = database.getResourceMetaData().getStorageUnits();
Collection<String> invalidStorageUnitNames = sqlStatement.getStorageUnits().stream().collect(Collectors.toMap(DataSourceSegment::getName, each -> each)).entrySet().stream()
.filter(each -> !isIdenticalDatabase(each.getValue(), storageUnits.get(each.getKey()))).map(Entry::getKey).collect(Collectors.toSet());
.filter(each -> !isSameDatabase(each.getValue(), storageUnits.get(each.getKey()))).map(Entry::getKey).collect(Collectors.toSet());
ShardingSpherePreconditions.checkState(invalidStorageUnitNames.isEmpty(),
() -> new InvalidStorageUnitsException(Collections.singleton(String.format("Cannot alter the database of %s", invalidStorageUnitNames))));
() -> new InvalidStorageUnitsException(Collections.singleton(String.format("Can not alter the database of %s", invalidStorageUnitNames))));
}

private boolean isIdenticalDatabase(final DataSourceSegment segment, final StorageUnit storageUnit) {
private boolean isSameDatabase(final DataSourceSegment segment, final StorageUnit storageUnit) {
String hostName = null;
String port = null;
String database = null;
if (segment instanceof HostnameAndPortBasedDataSourceSegment) {
hostName = ((HostnameAndPortBasedDataSourceSegment) segment).getHostname();
port = ((HostnameAndPortBasedDataSourceSegment) segment).getPort();
database = ((HostnameAndPortBasedDataSourceSegment) segment).getDatabase();
}
if (segment instanceof URLBasedDataSourceSegment) {
} else if (segment instanceof URLBasedDataSourceSegment) {
JdbcUrl segmentJdbcUrl = new StandardJdbcUrlParser().parse(((URLBasedDataSourceSegment) segment).getUrl());
hostName = segmentJdbcUrl.getHostname();
port = String.valueOf(segmentJdbcUrl.getPort());
Expand All @@ -133,4 +123,9 @@ private boolean isIdenticalDatabase(final DataSourceSegment segment, final Stora
return Objects.equals(hostName, connectionProperties.getHostname()) && Objects.equals(port, String.valueOf(connectionProperties.getPort()))
&& Objects.equals(database, connectionProperties.getCatalog());
}

@Override
public Class<AlterStorageUnitStatement> getType() {
return AlterStorageUnitStatement.class;
}
}
Loading

0 comments on commit 4889b3e

Please sign in to comment.