Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor by cr
Browse files Browse the repository at this point in the history
ZuLiangWang committed Nov 8, 2023
1 parent efae3e7 commit cd06007
Showing 11 changed files with 64 additions and 84 deletions.
2 changes: 1 addition & 1 deletion server/cluster/manager.go
Original file line number Diff line number Diff line change
@@ -325,7 +325,7 @@ func (m *managerImpl) DropTable(ctx context.Context, clusterName, schemaName, ta
return metadata.ErrVersionNotFound
}

_, err = cluster.metadata.DropTable(ctx, metadata.DropTableRequest{
err = cluster.metadata.DropTable(ctx, metadata.DropTableRequest{
SchemaName: schemaName,
TableName: tableName,
ShardID: shardID,
10 changes: 5 additions & 5 deletions server/cluster/manager_test.go
Original file line number Diff line number Diff line change
@@ -176,11 +176,11 @@ func testCreateTable(ctx context.Context, re *require.Assertions, manager cluste
c, err := manager.GetCluster(ctx, clusterName)
re.NoError(err)
_, err = c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{
ShardID: shardID,
LatestShardVersion: 0,
SchemaName: schema,
TableName: tableName,
PartitionInfo: storage.PartitionInfo{Info: nil},
ShardID: shardID,
LatestVersion: 0,
SchemaName: schema,
TableName: tableName,
PartitionInfo: storage.PartitionInfo{Info: nil},
})
re.NoError(err)
}
40 changes: 19 additions & 21 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
@@ -191,41 +191,37 @@ func (c *ClusterMetadata) GetShardTables(shardIDs []storage.ShardID) map[storage

// DropTable will drop table metadata and all mapping of this table.
// If the table to be dropped has been opened multiple times, all its mapping will be dropped.
func (c *ClusterMetadata) DropTable(ctx context.Context, request DropTableRequest) (DropTableResult, error) {
func (c *ClusterMetadata) DropTable(ctx context.Context, request DropTableRequest) error {
c.logger.Info("drop table start", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName))

var dropRes DropTableResult
if !c.ensureClusterStable() {
return dropRes, errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable")
return errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable")
}

table, ok, err := c.tableManager.GetTable(request.SchemaName, request.TableName)
if err != nil {
return dropRes, errors.WithMessage(err, "get table")
return errors.WithMessage(err, "get table")
}

if !ok {
return dropRes, ErrTableNotFound
return ErrTableNotFound
}

// Drop table.
err = c.tableManager.DropTable(ctx, request.SchemaName, request.TableName)
if err != nil {
return dropRes, errors.WithMessage(err, "table manager drop table")
return errors.WithMessage(err, "table manager drop table")
}

// Remove dropped table in shard view.
updateVersions, err := c.topologyManager.RemoveTable(ctx, request.ShardID, request.LatestVersion, []storage.TableID{table.ID})
err = c.topologyManager.RemoveTable(ctx, request.ShardID, request.LatestVersion, []storage.TableID{table.ID})
if err != nil {
return dropRes, errors.WithMessage(err, "topology manager remove table")
return errors.WithMessage(err, "topology manager remove table")
}

dropRes = DropTableResult{
ShardVersionUpdate: updateVersions,
}
c.logger.Info("drop table success", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName), zap.String("dropResult", fmt.Sprintf("%+v", dropRes)))
c.logger.Info("drop table success", zap.String("cluster", c.Name()), zap.String("schemaName", request.SchemaName), zap.String("tableName", request.TableName))

return dropRes, nil
return nil
}

// MigrateTable used to migrate tables from old shard to new shard.
@@ -256,12 +252,12 @@ func (c *ClusterMetadata) MigrateTable(ctx context.Context, request MigrateTable
tableIDs = append(tableIDs, table.ID)
}

if _, err := c.topologyManager.RemoveTable(ctx, request.OldShardID, request.latestOldShardVersion, tableIDs); err != nil {
if err := c.topologyManager.RemoveTable(ctx, request.OldShardID, request.latestOldShardVersion, tableIDs); err != nil {
c.logger.Error("remove table from topology")
return err
}

if _, err := c.topologyManager.AddTable(ctx, request.NewShardID, request.latestNewShardVersion, tables); err != nil {
if err := c.topologyManager.AddTable(ctx, request.NewShardID, request.latestNewShardVersion, tables); err != nil {
c.logger.Error("add table from topology")
return err
}
@@ -318,14 +314,13 @@ func (c *ClusterMetadata) AddTableTopology(ctx context.Context, shardVersionUpda
}

// Add table to topology manager.
result, err := c.topologyManager.AddTable(ctx, shardVersionUpdate.ShardID, shardVersionUpdate.LatestVersion, []storage.Table{table})
err := c.topologyManager.AddTable(ctx, shardVersionUpdate.ShardID, shardVersionUpdate.LatestVersion, []storage.Table{table})
if err != nil {
return CreateTableResult{}, errors.WithMessage(err, "topology manager add table")
}

ret := CreateTableResult{
Table: table,
ShardVersionUpdate: result,
Table: table,
}
c.logger.Info("add table topology succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", ret)))
return ret, nil
@@ -381,14 +376,17 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, request CreateTableRe
}

// Add table to topology manager.
result, err := c.topologyManager.AddTable(ctx, request.ShardID, request.LatestShardVersion, []storage.Table{table})
err = c.topologyManager.AddTable(ctx, request.ShardID, request.LatestVersion, []storage.Table{table})
if err != nil {
return CreateTableResult{}, errors.WithMessage(err, "topology manager add table")
}

ret := CreateTableResult{
Table: table,
ShardVersionUpdate: result,
Table: table,
ShardVersionUpdate: ShardVersionUpdate{
ShardID: request.ShardID,
LatestVersion: request.LatestVersion,
},
}
c.logger.Info("create table succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", ret)))
return ret, nil
13 changes: 6 additions & 7 deletions server/cluster/metadata/cluster_metadata_test.go
Original file line number Diff line number Diff line change
@@ -148,11 +148,11 @@ func testTableOperation(ctx context.Context, re *require.Assertions, m *metadata

// Test create table.
createResult, err := m.CreateTable(ctx, metadata.CreateTableRequest{
ShardID: 0,
LatestShardVersion: 0,
SchemaName: testSchema,
TableName: testTableName,
PartitionInfo: storage.PartitionInfo{Info: nil},
ShardID: 0,
LatestVersion: 0,
SchemaName: testSchema,
TableName: testTableName,
PartitionInfo: storage.PartitionInfo{Info: nil},
})
re.NoError(err)
re.Equal(testTableName, createResult.Table.Name)
@@ -178,14 +178,13 @@ func testTableOperation(ctx context.Context, re *require.Assertions, m *metadata
re.Equal(storage.ShardID(1), routeResult.RouteEntries[testTableName].NodeShards[0].ShardInfo.ID)

// Drop table already created.
dropResult, err := m.DropTable(ctx, metadata.DropTableRequest{
err = m.DropTable(ctx, metadata.DropTableRequest{
SchemaName: testSchema,
TableName: testTableName,
ShardID: storage.ShardID(1),
LatestVersion: 0,
})
re.NoError(err)
re.Equal(storage.ShardID(1), dropResult.ShardVersionUpdate.ShardID)
}

func testShardOperation(ctx context.Context, re *require.Assertions, m *metadata.ClusterMetadata) {
31 changes: 10 additions & 21 deletions server/cluster/metadata/topology_manager.go
Original file line number Diff line number Diff line change
@@ -38,9 +38,9 @@ type TopologyManager interface {
// GetTableIDs get shardNode and tablesIDs with shardID and nodeName.
GetTableIDs(shardIDs []storage.ShardID) map[storage.ShardID]ShardTableIDs
// AddTable add table to cluster topology.
AddTable(ctx context.Context, shardID storage.ShardID, latestShardVersion uint64, tables []storage.Table) (ShardVersionUpdate, error)
AddTable(ctx context.Context, shardID storage.ShardID, latestShardVersion uint64, tables []storage.Table) error
// RemoveTable remove table on target shards from cluster topology.
RemoveTable(ctx context.Context, shardID storage.ShardID, latestShardVersion uint64, tableIDs []storage.TableID) (ShardVersionUpdate, error)
RemoveTable(ctx context.Context, shardID storage.ShardID, latestShardVersion uint64, tableIDs []storage.TableID) error
// GetShards get all shards in cluster topology.
GetShards() []storage.ShardID
// GetShardNodesByID get shardNodes with shardID.
@@ -199,18 +199,15 @@ func (m *TopologyManagerImpl) GetTableIDs(shardIDs []storage.ShardID) map[storag
return shardTableIDs
}

func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.ShardID, latestShardVersion uint64, tables []storage.Table) (ShardVersionUpdate, error) {
func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.ShardID, latestShardVersion uint64, tables []storage.Table) error {
m.lock.Lock()
defer m.lock.Unlock()

shardView, ok := m.shardTablesMapping[shardID]
var emptyUpdate ShardVersionUpdate
if !ok {
return emptyUpdate, ErrShardNotFound.WithCausef("shard id:%d", shardID)
return ErrShardNotFound.WithCausef("shard id:%d", shardID)
}

latestVersion := shardView.Version

tableIDsToAdd := make([]storage.TableID, 0, len(tables))
for _, table := range tables {
tableIDsToAdd = append(tableIDsToAdd, table.ID)
@@ -229,7 +226,7 @@ func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.Shar
LatestVersion: latestShardVersion,
})
if err != nil {
return emptyUpdate, errors.WithMessage(err, "storage update shard view")
return errors.WithMessage(err, "storage update shard view")
}

// Update shard view in memory.
@@ -243,22 +240,17 @@ func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.Shar
}
}

return ShardVersionUpdate{
ShardID: shardID,
LatestVersion: latestVersion,
}, nil
return nil
}

func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.ShardID, latestShardVersion uint64, tableIDs []storage.TableID) (ShardVersionUpdate, error) {
func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.ShardID, latestShardVersion uint64, tableIDs []storage.TableID) error {
m.lock.Lock()
defer m.lock.Unlock()

shardView, ok := m.shardTablesMapping[shardID]
var emptyUpdate ShardVersionUpdate
if !ok {
return emptyUpdate, ErrShardNotFound.WithCausef("shard id:%d", shardID)
return ErrShardNotFound.WithCausef("shard id:%d", shardID)
}
latestVersion := shardView.Version

newTableIDs := make([]storage.TableID, 0, len(shardView.TableIDs))
for _, tableID := range shardView.TableIDs {
@@ -276,7 +268,7 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S
ShardView: newShardView,
LatestVersion: latestShardVersion,
}); err != nil {
return emptyUpdate, errors.WithMessage(err, "storage update shard view")
return errors.WithMessage(err, "storage update shard view")
}

// Update shardView in memory.
@@ -296,10 +288,7 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.S
}
}

return ShardVersionUpdate{
ShardID: shardID,
LatestVersion: latestVersion,
}, nil
return nil
}

func (m *TopologyManagerImpl) GetShards() []storage.ShardID {
14 changes: 5 additions & 9 deletions server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
@@ -87,11 +87,11 @@ type CreateTableMetadataResult struct {
}

type CreateTableRequest struct {
ShardID storage.ShardID
LatestShardVersion uint64
SchemaName string
TableName string
PartitionInfo storage.PartitionInfo
ShardID storage.ShardID
LatestVersion uint64
SchemaName string
TableName string
PartitionInfo storage.PartitionInfo
}

type CreateTableResult struct {
@@ -106,10 +106,6 @@ type DropTableRequest struct {
LatestVersion uint64
}

type DropTableResult struct {
ShardVersionUpdate ShardVersionUpdate
}

type DropTableMetadataResult struct {
Table storage.Table
}
Original file line number Diff line number Diff line change
@@ -314,10 +314,8 @@ func finishCallback(event *fsm.Event) {
log.Info("create partition table finish", zap.String("tableName", req.p.params.SourceReq.GetName()))

assert.Assert(req.p.createPartitionTableResult != nil)
var versionUpdate metadata.ShardVersionUpdate
if err := req.p.params.OnSucceeded(metadata.CreateTableResult{
Table: req.p.createPartitionTableResult.Table,
ShardVersionUpdate: versionUpdate,
Table: req.p.createPartitionTableResult.Table,
}); err != nil {
procedure.CancelEventWithLog(event, err, "create partition table on succeeded")
return
Original file line number Diff line number Diff line change
@@ -405,7 +405,7 @@ func dispatchDropDataTable(req *callbackRequest, dispatch eventdispatch.Dispatch
return errors.WithMessagef(err, "drop table, table:%s", tableName)
}

_, err = clusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{
err = clusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{
SchemaName: req.schemaName(),
TableName: tableName,
ShardID: shardID,
4 changes: 2 additions & 2 deletions server/coordinator/procedure/ddl/droptable/drop_table.go
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ func prepareCallback(event *fsm.Event) {
// In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table.
if !shardExists {
// Try to drop table with the latest shard version.
_, err = params.ClusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{
err = params.ClusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{
SchemaName: params.SourceReq.GetSchemaName(),
TableName: params.SourceReq.GetName(),
ShardID: shardVersionUpdate.ShardID,
@@ -110,7 +110,7 @@ func prepareCallback(event *fsm.Event) {

log.Debug("dispatch dropTableOnShard finish", zap.String("tableName", params.SourceReq.GetName()), zap.Uint64("procedureID", params.ID))

if _, err = params.ClusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{
if err = params.ClusterMetadata.DropTable(req.ctx, metadata.DropTableRequest{
SchemaName: params.SourceReq.GetSchemaName(),
TableName: params.SourceReq.GetName(),
ShardID: shardVersionUpdate.ShardID,
20 changes: 10 additions & 10 deletions server/coordinator/procedure/operation/split/split_test.go
Original file line number Diff line number Diff line change
@@ -41,19 +41,19 @@ func TestSplit(t *testing.T) {

// Create some tables in this shard.
_, err := c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{
ShardID: createTableNodeShard.ID,
LatestShardVersion: 0,
SchemaName: test.TestSchemaName,
TableName: test.TestTableName0,
PartitionInfo: storage.PartitionInfo{Info: nil},
ShardID: createTableNodeShard.ID,
LatestVersion: 0,
SchemaName: test.TestSchemaName,
TableName: test.TestTableName0,
PartitionInfo: storage.PartitionInfo{Info: nil},
})
re.NoError(err)
_, err = c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{
ShardID: createTableNodeShard.ID,
LatestShardVersion: 0,
SchemaName: test.TestSchemaName,
TableName: test.TestTableName1,
PartitionInfo: storage.PartitionInfo{Info: nil},
ShardID: createTableNodeShard.ID,
LatestVersion: 0,
SchemaName: test.TestSchemaName,
TableName: test.TestTableName1,
PartitionInfo: storage.PartitionInfo{Info: nil},
})
re.NoError(err)

8 changes: 4 additions & 4 deletions server/coordinator/shard_picker_test.go
Original file line number Diff line number Diff line change
@@ -59,10 +59,10 @@ func TestLeastTableShardPicker(t *testing.T) {

// Create table on shard 0.
_, err = c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{
ShardID: 0,
LatestShardVersion: 0,
SchemaName: test.TestSchemaName,
TableName: "test",
ShardID: 0,
LatestVersion: 0,
SchemaName: test.TestSchemaName,
TableName: "test",
PartitionInfo: storage.PartitionInfo{
Info: nil,
},

0 comments on commit cd06007

Please sign in to comment.