Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Dec 30, 2024
1 parent cdcc291 commit d2b68b8
Show file tree
Hide file tree
Showing 67 changed files with 3,518 additions and 1,205 deletions.
2 changes: 1 addition & 1 deletion br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func newFullBackupCommand() *cobra.Command {
return runBackupCommand(command, task.FullBackupCmd)
},
}
task.DefineFilterFlags(command, acceptAllTables, false)
task.DefineFilterFlags(command, acceptAllTables, acceptAllTables, false)
task.DefineBackupEBSFlags(command.PersistentFlags())
return command
}
Expand Down
4 changes: 2 additions & 2 deletions br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ var (
tidbGlue = gluetidb.New()
envLogToTermKey = "BR_LOG_TO_TERM"

filterOutSysAndMemTables = []string{
filterOutSysAndMemKeepPrivilege = []string{
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
fmt.Sprintf("!%s.*", utils.WithTemporaryDBNamePrefix("*")),
"!mysql.*",
"mysql.bind_info",
"mysql.user",
Expand Down
6 changes: 4 additions & 2 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func newFullRestoreCommand() *cobra.Command {
return runRestoreCommand(cmd, task.FullRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, false)
// default only restore some system tables
task.DefineFilterFlags(command, acceptAllTables, filterOutSysAndMemKeepPrivilege, false)
task.DefineRestoreSnapshotFlags(command)
return command
}
Expand Down Expand Up @@ -254,7 +255,8 @@ func newStreamRestoreCommand() *cobra.Command {
return runRestoreCommand(command, task.PointRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, true)
// default restore only some system tables
task.DefineFilterFlags(command, acceptAllTables, filterOutSysAndMemKeepPrivilege, true)
task.DefineStreamRestoreFlags(command)
return command
}
2 changes: 1 addition & 1 deletion br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func newStreamStartCommand() *cobra.Command {
},
}

task.DefineFilterFlags(command, acceptAllTables, true)
task.DefineFilterFlags(command, acceptAllTables, acceptAllTables, true)
task.DefineStreamStartFlags(command.Flags())
return command
}
Expand Down
1 change: 0 additions & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ go_library(
"//pkg/statistics/handle",
"//pkg/statistics/util",
"//pkg/util",
"//pkg/util/table-filter",
"@com_github_google_btree//:btree",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
Expand Down
11 changes: 5 additions & 6 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/util"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -675,11 +674,11 @@ func (bc *Client) SetApiVersion(v kvrpcpb.APIVersion) {
bc.apiVersion = v
}

// Client.BuildBackupRangeAndSchema calls BuildBackupRangeAndSchema,
// BuildBackupRangeAndSchema calls BuildBackupRangeAndSchema,
// if the checkpoint mode is used, return the ranges from checkpoint meta
func (bc *Client) BuildBackupRangeAndSchema(
storage kv.Storage,
tableFilter filter.Filter,
tableFilter *utils.CombinedFilter,
backupTS uint64,
isFullBackup bool,
) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) {
Expand Down Expand Up @@ -714,12 +713,12 @@ func CheckBackupStorageIsLocked(ctx context.Context, s storage.ExternalStorage)
return nil
}

// BuildBackupRangeAndSchema gets KV range and schema of tables.
// BuildBackupRangeAndInitSchema gets KV range and schema of tables.
// KV ranges are separated by Table IDs.
// Also, KV ranges are separated by Index IDs in the same table.
func BuildBackupRangeAndInitSchema(
storage kv.Storage,
tableFilter filter.Filter,
tableFilter *utils.CombinedFilter,
backupTS uint64,
isFullBackup bool,
buildRange bool,
Expand Down Expand Up @@ -815,7 +814,7 @@ func BuildBackupRangeAndInitSchema(

func BuildBackupSchemas(
storage kv.Storage,
tableFilter filter.Filter,
tableFilter *utils.CombinedFilter,
backupTS uint64,
isFullBackup bool,
fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (ss *Schemas) BackupSchemas(
}

if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
schema.dbInfo.Name = utils.WithTemporaryDBNamePrefix(schema.dbInfo.Name.O)
}

var checksum *checkpoint.ChecksumItem
Expand Down
26 changes: 15 additions & 11 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,29 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
tk := testkit.NewTestKit(t, m.Storage)

// Table t1 is not exist.
testFilter, err := filter.Parse([]string{"test.t1"})
userFilter, err := filter.Parse([]string{"test.t1"})
combinedFilter1 := utils.NewCombinedFilterNoSystem(userFilter)
require.NoError(t, err)
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(
m.Storage, testFilter, math.MaxUint64, false, true)
m.Storage, combinedFilter1, math.MaxUint64, false, true)
require.NoError(t, err)
require.NotNil(t, backupSchemas)

// Database is not exist.
fooFilter, err := filter.Parse([]string{"foo.t1"})
combinedFilter2 := utils.NewCombinedFilterNoSystem(fooFilter)
require.NoError(t, err)
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(
m.Storage, fooFilter, math.MaxUint64, false, true)
m.Storage, combinedFilter2, math.MaxUint64, false, true)
require.NoError(t, err)
require.Nil(t, backupSchemas)

// Empty database.
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*", "!sys.*"})
combinedFilter3 := utils.NewCombinedFilterRejectAll()
require.NoError(t, err)
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(
m.Storage, noFilter, math.MaxUint64, false, true)
m.Storage, combinedFilter3, math.MaxUint64, false, true)
require.NoError(t, err)
require.NotNil(t, backupSchemas)

Expand All @@ -137,7 +139,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {

var policies []*backuppb.PlacementPolicy
_, backupSchemas, policies, err = backup.BuildBackupRangeAndInitSchema(
m.Storage, testFilter, math.MaxUint64, false, true)
m.Storage, combinedFilter1, math.MaxUint64, false, true)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())
// we expect no policies collected, because it's not full backup.
Expand Down Expand Up @@ -170,7 +172,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
tk.MustExec("insert into t2 values (11);")

_, backupSchemas, policies, err = backup.BuildBackupRangeAndInitSchema(
m.Storage, noFilter, math.MaxUint64, true, true)
m.Storage, combinedFilter3, math.MaxUint64, true, true)
require.NoError(t, err)
require.Equal(t, 2, backupSchemas.Len())
// we expect the policy fivereplicas collected in full backup.
Expand Down Expand Up @@ -217,9 +219,10 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
`)

f, err := filter.Parse([]string{"test.t3"})
combinedFilter := utils.NewCombinedFilterNoSystem(f)
require.NoError(t, err)

_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false, true)
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(m.Storage, combinedFilter, math.MaxUint64, false, true)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())

Expand Down Expand Up @@ -253,7 +256,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
// recover the statistics.
tk.MustExec("analyze table t3 all columns;")

_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false, true)
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(m.Storage, combinedFilter, math.MaxUint64, false, true)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())

Expand Down Expand Up @@ -293,8 +296,9 @@ func TestBackupSchemasForSystemTable(t *testing.T) {
}

f, err := filter.Parse([]string{"mysql.systable*"})
combinedFilter := utils.NewCombinedFilterNoUser(f)
require.NoError(t, err)
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false, true)
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(m.Storage, combinedFilter, math.MaxUint64, false, true)
require.NoError(t, err)
require.Equal(t, systemTablesCount, backupSchemas.Len())

Expand All @@ -314,7 +318,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) {
schemas2 := GetSchemasFromMeta(t, es2)
require.Len(t, schemas2, systemTablesCount)
for _, schema := range schemas2 {
require.Equal(t, utils.TemporaryDBName("mysql"), schema.DB.Name)
require.Equal(t, utils.WithTemporaryDBNamePrefix("mysql"), schema.DB.Name)
require.Equal(t, true, strings.HasPrefix(schema.Info.Name.O, tablePrefix))
}
}
6 changes: 3 additions & 3 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func TestCheckpointMetaForRestore(t *testing.T) {
exists := checkpoint.ExistsCheckpointProgress(ctx, dom)
require.False(t, exists)
err = checkpoint.SaveCheckpointProgress(ctx, se, &checkpoint.CheckpointProgress{
Progress: checkpoint.InLogRestoreAndIdMapPersist,
Progress: checkpoint.InLogRestoreAndIdMapPersisted,
})
require.NoError(t, err)
progress, err := checkpoint.LoadCheckpointProgress(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, progress.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)

taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, s.Mock.Domain, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
Expand All @@ -120,7 +120,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
require.Equal(t, uint64(333), taskInfo.Metadata.RewriteTS)
require.Equal(t, "1.0", taskInfo.Metadata.GcRatio)
require.Equal(t, true, taskInfo.HasSnapshotMetadata)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, taskInfo.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress)

exists = checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, dom)
require.False(t, exists)
Expand Down
20 changes: 10 additions & 10 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,22 +194,22 @@ func ExistsLogRestoreCheckpointMetadata(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName))
}

// A progress type for snapshot + log restore.
// RestoreProgress is a progress type for snapshot + log restore.
//
// Before the id-maps is persist into external storage, the snapshot restore and
// id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`,
// Before the id-maps is persisted into external storage, the snapshot restore and
// id-maps building can be retried. So if the progress is in `InSnapshotRestore`,
// it can retry from snapshot restore.
//
// After the id-maps is persist into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. Where would be a situation:
// After the id-maps is persisted into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. A situation could be:
//
// the first execution:
//
// table A created in snapshot restore is renamed to table B in log restore
// table A (id 80) --------------> table B (id 80)
// ( snapshot restore ) ( log restore )
//
// the second execution if don't skip snasphot restore:
// the second execution if don't skip snapshot restore:
//
// table A is created again in snapshot restore, because there is no table named A
// table A (id 81) --------------> [not in id-maps, so ignored]
Expand All @@ -221,8 +221,8 @@ type RestoreProgress int

const (
InSnapshotRestore RestoreProgress = iota
// Only when the id-maps is persist, status turns into it.
InLogRestoreAndIdMapPersist
// Only when the id-maps is persisted, status turns into it.
InLogRestoreAndIdMapPersisted
)

type CheckpointProgress struct {
Expand Down Expand Up @@ -254,8 +254,8 @@ func ExistsCheckpointProgress(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName))
}

// CheckpointTaskInfo is unique information within the same cluster id. It represents the last
// restore task executed for this cluster.
// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
// It represents the last restore task executed in this cluster.
type CheckpointTaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/import_mode_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (switcher *ImportModeSwitcher) GoSwitchToImportMode(
return nil
}

// RestorePreWork executes some prepare work before restore.
// RestorePreWork switches to import mode and removes pd schedulers if needed
// TODO make this function returns a restore post work.
func RestorePreWork(
ctx context.Context,
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "log_client",
srcs = [
"batch_meta_processor.go",
"client.go",
"compacted_file_strategy.go",
"import.go",
Expand Down Expand Up @@ -36,6 +37,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/version",
"//pkg/ddl/util",
Expand All @@ -47,7 +49,6 @@ go_library(
"//pkg/util/codec",
"//pkg/util/redact",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_fatih_color//:color",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
Expand All @@ -71,7 +72,6 @@ go_library(
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

Expand Down Expand Up @@ -103,6 +103,7 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/utiltest",
"//pkg/domain",
Expand All @@ -117,7 +118,6 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
Loading

0 comments on commit d2b68b8

Please sign in to comment.