Skip to content

Commit

Permalink
importinto, disttask: allow multi import job & adjust total memory fo…
Browse files Browse the repository at this point in the history
…r cgroup v2 (pingcap#51575)

ref pingcap#49008
  • Loading branch information
D3Hunter authored Mar 14, 2024
1 parent 9545923 commit dbdae85
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 42 deletions.
1 change: 1 addition & 0 deletions pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/cgroup",
"//pkg/util/cpu",
"//pkg/util/gctuner",
"//pkg/util/intest",
Expand Down
17 changes: 17 additions & 0 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/cgroup"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
Expand Down Expand Up @@ -92,6 +93,22 @@ func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager,
if totalCPU <= 0 || totalMem <= 0 {
return nil, errors.Errorf("invalid cpu or memory, cpu: %d, memory: %d", totalCPU, totalMem)
}
cgroupLimit, version, err := cgroup.GetCgroupMemLimit()
// ignore the error of cgroup.GetCgroupMemLimit, as it's not a must-success step.
if err == nil && version == cgroup.V2 {
// see cgroup.detectMemLimitInV2 for more details.
// below are some real memory limits tested on GCP:
// node-spec real-limit percent
// 16c32g 27.83Gi 87%
// 32c64g 57.36Gi 89.6%
// we use 'limit', not totalMem for adjust, as totalMem = min(physical-mem, 'limit')
// content of 'memory.max' might be 'max', so we use the min of them.
adjustedMem := min(totalMem, uint64(float64(cgroupLimit)*0.88))
logger.Info("adjust memory limit for cgroup v2",
zap.String("before", units.BytesSize(float64(totalMem))),
zap.String("after", units.BytesSize(float64(adjustedMem))))
totalMem = adjustedMem
}
logger.Info("build task executor manager", zap.Int("total-cpu", totalCPU),
zap.String("total-mem", units.BytesSize(float64(totalMem))))
m := &Manager{
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ go_library(
"//pkg/table/tables",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/disttask",
"//pkg/util/etcd",
"//pkg/util/logutil",
Expand Down
16 changes: 2 additions & 14 deletions pkg/disttask/importinto/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -70,18 +69,6 @@ func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instanc
if err = taskManager.WithNewTxn(ctx, func(se sessionctx.Context) error {
var err2 error
exec := se.(sqlexec.SQLExecutor)
// If 2 client try to execute IMPORT INTO concurrently, there's chance that both of them will pass the check.
// We can enforce ONLY one import job running by:
// - using LOCK TABLES, but it requires enable-table-lock=true, it's not enabled by default.
// - add a key to PD as a distributed lock, but it's a little complex, and we might support job queuing later.
// So we only add this simple soft check here and doc it.
activeJobCnt, err2 := importer.GetActiveJobCnt(ctx, exec)
if err2 != nil {
return err2
}
if activeJobCnt > 0 {
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("there's pending or running jobs")
}
jobID, err2 = importer.CreateJob(ctx, exec, plan.DBName, plan.TableInfo.Name.L, plan.TableInfo.ID,
plan.User, plan.Parameters, plan.TotalFileSize)
if err2 != nil {
Expand Down Expand Up @@ -125,7 +112,8 @@ func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instanc
zap.Int64("job-id", jobID),
zap.Int64("task-id", task.ID),
zap.String("data-size", units.BytesSize(float64(plan.TotalFileSize))),
zap.Int("thread-cnt", plan.ThreadCnt))
zap.Int("thread-cnt", plan.ThreadCnt),
zap.Bool("global-sort", plan.IsGlobalSort()))

return jobID, task, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,13 +1276,13 @@ func (e *LoadDataController) toMyDumpFiles() []mydump.FileInfo {
}

// IsLocalSort returns true if we sort data on local disk.
func (e *LoadDataController) IsLocalSort() bool {
return e.Plan.CloudStorageURI == ""
func (p *Plan) IsLocalSort() bool {
return p.CloudStorageURI == ""
}

// IsGlobalSort returns true if we sort data on global storage.
func (e *LoadDataController) IsGlobalSort() bool {
return !e.IsLocalSort()
func (p *Plan) IsGlobalSort() bool {
return !p.IsLocalSort()
}

// CreateColAssignExprs creates the column assignment expressions using session context.
Expand Down
38 changes: 33 additions & 5 deletions pkg/util/cgroup/cgroup_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,26 @@ import (
"github.com/pingcap/log"
)

// Version represents the cgroup version.
type Version int

// cgroup versions.
const (
Unknown Version = 0
V1 Version = 1
V2 Version = 2
)

// GetMemoryLimit attempts to retrieve the cgroup memory limit for the current
// process.
func GetMemoryLimit() (limit uint64, err error) {
limit, _, err = getCgroupMemLimit("/")
return
}

// GetCgroupMemLimit attempts to retrieve the cgroup memory limit for the current
// process, and return cgroup version too.
func GetCgroupMemLimit() (uint64, Version, error) {
return getCgroupMemLimit("/")
}

Expand Down Expand Up @@ -118,41 +135,46 @@ func getCgroupMemUsage(root string) (usage uint64, err error) {
}

// root is always "/" in the production. It will be changed for testing.
func getCgroupMemLimit(root string) (limit uint64, err error) {
func getCgroupMemLimit(root string) (limit uint64, version Version, err error) {
version = Unknown
path, err := detectControlPath(filepath.Join(root, procPathCGroup), "memory")
if err != nil {
return 0, err
return 0, version, err
}

if path == "" {
log.Warn("no cgroup memory controller detected")
return 0, nil
return 0, version, nil
}

mount, ver, err := getCgroupDetails(filepath.Join(root, procPathMountInfo), path, "memory")
if err != nil {
return 0, err
return 0, version, err
}

if len(ver) == 2 {
version = V1
limit, err = detectMemLimitInV1(filepath.Join(root, mount[0]))
if err != nil {
version = V2
limit, err = detectMemLimitInV2(filepath.Join(root, mount[1], path))
}
} else {
switch ver[0] {
case 1:
// cgroupv1
version = V1
limit, err = detectMemLimitInV1(filepath.Join(root, mount[0]))
case 2:
// cgroupv2
version = V2
limit, err = detectMemLimitInV2(filepath.Join(root, mount[0], path))
default:
limit, err = 0, fmt.Errorf("detected unknown cgroup version index: %d", ver)
}
}

return limit, err
return limit, version, err
}

func detectMemLimitInV1(cRoot string) (limit uint64, err error) {
Expand All @@ -162,6 +184,12 @@ func detectMemLimitInV1(cRoot string) (limit uint64, err error) {
// TODO(hawkingrei): this implementation was based on podman+criu environment.
// It may cover not all the cases when v2 becomes more widely used in container
// world.
// In K8S env, the value hold in memory.max is the memory limit defined in pod definition,
// the real memory limit should be the minimum value in the whole cgroup hierarchy
// as in cgroup V1, but cgroup V2 lacks the feature, see this patch for more details:
// https://lore.kernel.org/linux-kernel/[email protected]/T/
// So, in cgroup V2, the value better be adjusted by some factor, like 0.9, to
// avoid OOM. see taskexecutor/manager.go too.
func detectMemLimitInV2(cRoot string) (limit uint64, err error) {
return readInt64Value(cRoot, cgroupV2MemLimit, 2)
}
Expand Down
32 changes: 20 additions & 12 deletions pkg/util/cgroup/cgroup_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ func TestCgroupsGetMemoryInactiveFileUsage(t *testing.T) {

func TestCgroupsGetMemoryLimit(t *testing.T) {
for _, tc := range []struct {
name string
paths map[string]string
errMsg string
limit uint64
warn string
name string
paths map[string]string
errMsg string
limit uint64
warn string
version Version
}{
{

errMsg: "failed to read memory cgroup from cgroups file:",
},
{
Expand Down Expand Up @@ -248,15 +248,17 @@ func TestCgroupsGetMemoryLimit(t *testing.T) {
"/proc/self/mountinfo": v1MountsWithMemController,
"/sys/fs/cgroup/memory/memory.stat": v1MemoryStat,
},
limit: 2936016896,
limit: 2936016896,
version: V1,
},
{
paths: map[string]string{
"/proc/self/cgroup": v1CgroupWithMemoryControllerNS,
"/proc/self/mountinfo": v1MountsWithMemControllerNS,
"/sys/fs/cgroup/memory/cgroup_test/memory.stat": v1MemoryStat,
},
limit: 2936016896,
limit: 2936016896,
version: V1,
},
{
paths: map[string]string{
Expand All @@ -279,30 +281,36 @@ func TestCgroupsGetMemoryLimit(t *testing.T) {
"/proc/self/mountinfo": v2Mounts,
"/sys/fs/cgroup/machine.slice/libpod-f1c6b44c0d61f273952b8daecf154cee1be2d503b7e9184ebf7fcaf48e139810.scope/memory.max": "1073741824\n",
},
limit: 1073741824,
limit: 1073741824,
version: V2,
},
{
paths: map[string]string{
"/proc/self/cgroup": v2CgroupWithMemoryController,
"/proc/self/mountinfo": v2Mounts,
"/sys/fs/cgroup/machine.slice/libpod-f1c6b44c0d61f273952b8daecf154cee1be2d503b7e9184ebf7fcaf48e139810.scope/memory.max": "max\n",
},
limit: 9223372036854775807,
limit: 9223372036854775807,
version: V2,
},
{
paths: map[string]string{
"/proc/self/cgroup": v1CgroupWithEccentricMemoryController,
"/proc/self/mountinfo": v1MountsWithEccentricMemController,
"/sys/fs/cgroup/memory/memory.stat": v1MemoryStat,
},
limit: 2936016896,
limit: 2936016896,
version: V1,
},
} {
dir := createFiles(t, tc.paths)
limit, err := getCgroupMemLimit(dir)
limit, version, err := getCgroupMemLimit(dir)
require.True(t, isError(err, tc.errMsg),
"%v %v", err, tc.errMsg)
require.Equal(t, tc.limit, limit)
if err == nil {
require.Equal(t, tc.version, version)
}
}
}

Expand Down
6 changes: 0 additions & 6 deletions tests/realtikvtest/importintotest/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/sem"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -958,11 +957,6 @@ func (s *mockGCSSuite) TestRegisterTask() {
}()
// wait for the task to be registered
<-importinto.TestSyncChan
// cannot run 2 import job at the same time
tk2 := testkit.NewTestKit(s.T(), s.store)
err = tk2.QueryToErr(sql)
s.ErrorIs(err, exeerrors.ErrLoadDataPreCheckFailed)
s.ErrorContains(err, "there's pending or running jobs")

client, err := importer.GetEtcdClient()
s.NoError(err)
Expand Down

0 comments on commit dbdae85

Please sign in to comment.