From 895fb49772366a36851e3ce1fd674e7dc72478fa Mon Sep 17 00:00:00 2001 From: Eric Lam Date: Tue, 29 Oct 2024 11:15:42 +0000 Subject: [PATCH] Add stage cache to avoid mutilple SQL call to get the same StageDef (#19586) When we do SELECT * with the datalink type, each datalink URL with stage:// schema will call a SQL to get the corresponding stage definition from mo_catalog.mo_stage table. Mostly likely, all datalink URL share same stage definition but we still call the same SQL for every URLs. Add a cache in process to cache the Stage Defintion and use the cached value with the same stage name. Approved by: @zhangxu19830126, @m-schen, @badboynt1, @qingxinhome, @aunjgr, @sukki37, @daviszhen --- pkg/common/stage/stage_test.go | 37 --- pkg/{common => }/datalink/datalink.go | 9 +- pkg/{common => }/datalink/datalink_test.go | 0 pkg/{common => }/datalink/docx/docx.go | 0 pkg/{common => }/datalink/docx/docx_test.go | 0 .../datalink/docx/testfiles/sample.xml | 0 .../datalink/docx/testfiles/test.docx | Bin .../datalink/docx/testfiles/test2.docx | Bin .../datalink/docx/testfiles/text.txt | 0 pkg/{common => }/datalink/docx/types.go | 0 pkg/{common => }/datalink/pdf/pdf.go | 0 pkg/frontend/authenticate.go | 5 +- pkg/{common => }/fulltext/fulltext.go | 0 pkg/{common => }/fulltext/fulltext_test.go | 0 pkg/{common => }/fulltext/types.go | 0 pkg/sql/colexec/evalExpression.go | 2 +- pkg/sql/colexec/table_function/fulltext.go | 2 +- .../table_function/fulltext_tokenize.go | 4 +- pkg/sql/colexec/table_function/stage.go | 6 +- pkg/sql/compile/compile.go | 12 +- pkg/sql/compile/compile2.go | 2 + pkg/sql/plan/function/func_unary.go | 2 +- pkg/sql/plan/utils.go | 5 +- pkg/sql/plan/utils_test.go | 2 +- pkg/sql/util/eval_expr_util.go | 2 +- pkg/stage/stage.go | 186 +++++++++++++++ pkg/stage/stage_test.go | 182 +++++++++++++++ .../stage.go => stage/stageutil/stageutil.go} | 216 +++--------------- pkg/stage/stageutil/stageutil_test.go | 80 +++++++ pkg/vm/process/process2.go | 2 + pkg/vm/process/types.go | 8 + 31 files changed, 520 insertions(+), 244 deletions(-) delete mode 100644 pkg/common/stage/stage_test.go rename pkg/{common => }/datalink/datalink.go (93%) rename pkg/{common => }/datalink/datalink_test.go (100%) rename pkg/{common => }/datalink/docx/docx.go (100%) rename pkg/{common => }/datalink/docx/docx_test.go (100%) rename pkg/{common => }/datalink/docx/testfiles/sample.xml (100%) rename pkg/{common => }/datalink/docx/testfiles/test.docx (100%) rename pkg/{common => }/datalink/docx/testfiles/test2.docx (100%) rename pkg/{common => }/datalink/docx/testfiles/text.txt (100%) rename pkg/{common => }/datalink/docx/types.go (100%) rename pkg/{common => }/datalink/pdf/pdf.go (100%) rename pkg/{common => }/fulltext/fulltext.go (100%) rename pkg/{common => }/fulltext/fulltext_test.go (100%) rename pkg/{common => }/fulltext/types.go (100%) create mode 100644 pkg/stage/stage.go create mode 100644 pkg/stage/stage_test.go rename pkg/{common/stage/stage.go => stage/stageutil/stageutil.go} (53%) create mode 100644 pkg/stage/stageutil/stageutil_test.go diff --git a/pkg/common/stage/stage_test.go b/pkg/common/stage/stage_test.go deleted file mode 100644 index 31103b5f06a7..000000000000 --- a/pkg/common/stage/stage_test.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2024 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package stage - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestS3ServiceProvider(t *testing.T) { - protocol, err := getS3ServiceFromProvider("cos") - require.Nil(t, err) - assert.Equal(t, protocol, "s3") - - protocol, err = getS3ServiceFromProvider("amazon") - require.Nil(t, err) - assert.Equal(t, protocol, "s3") - - protocol, err = getS3ServiceFromProvider("minio") - require.Nil(t, err) - assert.Equal(t, protocol, "minio") - -} diff --git a/pkg/common/datalink/datalink.go b/pkg/datalink/datalink.go similarity index 93% rename from pkg/common/datalink/datalink.go rename to pkg/datalink/datalink.go index b78f13243edf..a49d515eb1b3 100644 --- a/pkg/common/datalink/datalink.go +++ b/pkg/datalink/datalink.go @@ -21,11 +21,12 @@ import ( "strconv" "strings" - "github.com/matrixorigin/matrixone/pkg/common/datalink/docx" - "github.com/matrixorigin/matrixone/pkg/common/datalink/pdf" "github.com/matrixorigin/matrixone/pkg/common/moerr" - "github.com/matrixorigin/matrixone/pkg/common/stage" + "github.com/matrixorigin/matrixone/pkg/datalink/docx" + "github.com/matrixorigin/matrixone/pkg/datalink/pdf" "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/stage" + "github.com/matrixorigin/matrixone/pkg/stage/stageutil" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -104,7 +105,7 @@ func ParseDatalink(fsPath string, proc *process.Process) (string, []int, error) case stage.FILE_PROTOCOL: moUrl = strings.Join([]string{u.Host, u.Path}, "") case stage.STAGE_PROTOCOL: - moUrl, _, err = stage.UrlToPath(fsPath, proc) + moUrl, _, err = stageutil.UrlToPath(fsPath, proc) if err != nil { return "", nil, err } diff --git a/pkg/common/datalink/datalink_test.go b/pkg/datalink/datalink_test.go similarity index 100% rename from pkg/common/datalink/datalink_test.go rename to pkg/datalink/datalink_test.go diff --git a/pkg/common/datalink/docx/docx.go b/pkg/datalink/docx/docx.go similarity index 100% rename from pkg/common/datalink/docx/docx.go rename to pkg/datalink/docx/docx.go diff --git a/pkg/common/datalink/docx/docx_test.go b/pkg/datalink/docx/docx_test.go similarity index 100% rename from pkg/common/datalink/docx/docx_test.go rename to pkg/datalink/docx/docx_test.go diff --git a/pkg/common/datalink/docx/testfiles/sample.xml b/pkg/datalink/docx/testfiles/sample.xml similarity index 100% rename from pkg/common/datalink/docx/testfiles/sample.xml rename to pkg/datalink/docx/testfiles/sample.xml diff --git a/pkg/common/datalink/docx/testfiles/test.docx b/pkg/datalink/docx/testfiles/test.docx similarity index 100% rename from pkg/common/datalink/docx/testfiles/test.docx rename to pkg/datalink/docx/testfiles/test.docx diff --git a/pkg/common/datalink/docx/testfiles/test2.docx b/pkg/datalink/docx/testfiles/test2.docx similarity index 100% rename from pkg/common/datalink/docx/testfiles/test2.docx rename to pkg/datalink/docx/testfiles/test2.docx diff --git a/pkg/common/datalink/docx/testfiles/text.txt b/pkg/datalink/docx/testfiles/text.txt similarity index 100% rename from pkg/common/datalink/docx/testfiles/text.txt rename to pkg/datalink/docx/testfiles/text.txt diff --git a/pkg/common/datalink/docx/types.go b/pkg/datalink/docx/types.go similarity index 100% rename from pkg/common/datalink/docx/types.go rename to pkg/datalink/docx/types.go diff --git a/pkg/common/datalink/pdf/pdf.go b/pkg/datalink/pdf/pdf.go similarity index 100% rename from pkg/common/datalink/pdf/pdf.go rename to pkg/datalink/pdf/pdf.go diff --git a/pkg/frontend/authenticate.go b/pkg/frontend/authenticate.go index 990efef5af77..38a55432017b 100644 --- a/pkg/frontend/authenticate.go +++ b/pkg/frontend/authenticate.go @@ -38,7 +38,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/common/pubsub" - "github.com/matrixorigin/matrixone/pkg/common/stage" "github.com/matrixorigin/matrixone/pkg/config" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/defines" @@ -56,6 +55,8 @@ import ( plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/sql/util" + "github.com/matrixorigin/matrixone/pkg/stage" + "github.com/matrixorigin/matrixone/pkg/stage/stageutil" "github.com/matrixorigin/matrixone/pkg/taskservice" "github.com/matrixorigin/matrixone/pkg/util/metric/mometric" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" @@ -3349,7 +3350,7 @@ func doCheckFilePath(ctx context.Context, ses *Session, ep *tree.ExportParam) (e filePath = ep.FilePath if strings.HasPrefix(filePath, stage.STAGE_PROTOCOL+"://") { // stage:// URL - s, err := stage.UrlToStageDef(filePath, ses.proc) + s, err := stageutil.UrlToStageDef(filePath, ses.proc) if err != nil { return err } diff --git a/pkg/common/fulltext/fulltext.go b/pkg/fulltext/fulltext.go similarity index 100% rename from pkg/common/fulltext/fulltext.go rename to pkg/fulltext/fulltext.go diff --git a/pkg/common/fulltext/fulltext_test.go b/pkg/fulltext/fulltext_test.go similarity index 100% rename from pkg/common/fulltext/fulltext_test.go rename to pkg/fulltext/fulltext_test.go diff --git a/pkg/common/fulltext/types.go b/pkg/fulltext/types.go similarity index 100% rename from pkg/common/fulltext/types.go rename to pkg/fulltext/types.go diff --git a/pkg/sql/colexec/evalExpression.go b/pkg/sql/colexec/evalExpression.go index 4e3c3f72fcd6..3cf2836c4150 100644 --- a/pkg/sql/colexec/evalExpression.go +++ b/pkg/sql/colexec/evalExpression.go @@ -19,7 +19,6 @@ import ( "fmt" "math" - "github.com/matrixorigin/matrixone/pkg/common/datalink" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/common/reuse" @@ -27,6 +26,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/datalink" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" diff --git a/pkg/sql/colexec/table_function/fulltext.go b/pkg/sql/colexec/table_function/fulltext.go index 0533b3529655..882a00be08e5 100644 --- a/pkg/sql/colexec/table_function/fulltext.go +++ b/pkg/sql/colexec/table_function/fulltext.go @@ -18,12 +18,12 @@ import ( "fmt" "strings" - "github.com/matrixorigin/matrixone/pkg/common/fulltext" "github.com/matrixorigin/matrixone/pkg/common/moerr" moruntime "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/fulltext" "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/util/executor" "github.com/matrixorigin/matrixone/pkg/vm/process" diff --git a/pkg/sql/colexec/table_function/fulltext_tokenize.go b/pkg/sql/colexec/table_function/fulltext_tokenize.go index cb4c5e81d457..23a4fc050f8b 100644 --- a/pkg/sql/colexec/table_function/fulltext_tokenize.go +++ b/pkg/sql/colexec/table_function/fulltext_tokenize.go @@ -18,13 +18,13 @@ import ( "encoding/json" "strings" - "github.com/matrixorigin/matrixone/pkg/common/datalink" - "github.com/matrixorigin/matrixone/pkg/common/fulltext" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/bytejson" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/datalink" + "github.com/matrixorigin/matrixone/pkg/fulltext" "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/vm" "github.com/matrixorigin/matrixone/pkg/vm/process" diff --git a/pkg/sql/colexec/table_function/stage.go b/pkg/sql/colexec/table_function/stage.go index 0eaa96753ab4..a230e55e97db 100644 --- a/pkg/sql/colexec/table_function/stage.go +++ b/pkg/sql/colexec/table_function/stage.go @@ -20,12 +20,12 @@ import ( "strings" "github.com/matrixorigin/matrixone/pkg/common/moerr" - "github.com/matrixorigin/matrixone/pkg/common/stage" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/sql/colexec" + "github.com/matrixorigin/matrixone/pkg/stage/stageutil" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -82,7 +82,7 @@ func stageList(proc *process.Process, tableFunction *TableFunction, filepath str return nil } - s, err := stage.UrlToStageDef(string(filepath), proc) + s, err := stageutil.UrlToStageDef(string(filepath), proc) if err != nil { return err } @@ -105,7 +105,7 @@ func stageList(proc *process.Process, tableFunction *TableFunction, filepath str pattern = path.Clean("/" + pattern) - fileList, err := stage.StageListWithPattern(service, pattern, proc) + fileList, err := stageutil.StageListWithPattern(service, pattern, proc) if err != nil { return err } diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 2c31eb1047cf..b0a749ac52f6 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -468,6 +468,14 @@ func (c *Compile) prePipelineInitializer() (err error) { // run once func (c *Compile) runOnce() (err error) { //c.printPipeline() + + // defer cleanup at the end of runOnce() + defer func() { + // cleanup post dml sql and stage cache + c.proc.Base.PostDmlSqlList.Clear() + c.proc.Base.StageCache.Clear() + }() + if c.IsTpQuery() && len(c.scopes) == 1 { if err = c.run(c.scopes[0]); err != nil { return err @@ -522,10 +530,6 @@ func (c *Compile) runOnce() (err error) { } } - // cleanup post dml sql - defer func() { - c.proc.Base.PostDmlSqlList.Clear() - }() for _, sql := range c.proc.Base.PostDmlSqlList.Values() { err = c.runSql(sql) if err != nil { diff --git a/pkg/sql/compile/compile2.go b/pkg/sql/compile/compile2.go index c0ff50873fb3..753df1e90337 100644 --- a/pkg/sql/compile/compile2.go +++ b/pkg/sql/compile/compile2.go @@ -329,6 +329,8 @@ func (c *Compile) prepareRetry(defChanged bool) (*Compile, error) { // clear PostDmlSqlList c.proc.GetPostDmlSqlList().Clear() + // clear stage cache + c.proc.GetStageCache().Clear() // FIXME: the current retry method is quite bad, the overhead is relatively large, and needs to be // improved to refresh expression in the future. diff --git a/pkg/sql/plan/function/func_unary.go b/pkg/sql/plan/function/func_unary.go index 6c0fcc7df8ec..e74b435d2b03 100644 --- a/pkg/sql/plan/function/func_unary.go +++ b/pkg/sql/plan/function/func_unary.go @@ -33,8 +33,8 @@ import ( "time" "unsafe" - "github.com/matrixorigin/matrixone/pkg/common/datalink" "github.com/matrixorigin/matrixone/pkg/common/util" + "github.com/matrixorigin/matrixone/pkg/datalink" "github.com/RoaringBitmap/roaring" "golang.org/x/exp/constraints" diff --git a/pkg/sql/plan/utils.go b/pkg/sql/plan/utils.go index bb5fd4e05a96..9756618b70bf 100644 --- a/pkg/sql/plan/utils.go +++ b/pkg/sql/plan/utils.go @@ -31,7 +31,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" - "github.com/matrixorigin/matrixone/pkg/common/stage" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" @@ -45,6 +44,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/sql/plan/rule" "github.com/matrixorigin/matrixone/pkg/sql/util" + "github.com/matrixorigin/matrixone/pkg/stage" + "github.com/matrixorigin/matrixone/pkg/stage/stageutil" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -1569,7 +1570,7 @@ func InitInfileOrStageParam(param *tree.ExternParam, proc *process.Process) erro return InitInfileParam(param) } - s, err := stage.UrlToStageDef(fpath, proc) + s, err := stageutil.UrlToStageDef(fpath, proc) if err != nil { return err } diff --git a/pkg/sql/plan/utils_test.go b/pkg/sql/plan/utils_test.go index 461310516ecb..ad734f1288e4 100644 --- a/pkg/sql/plan/utils_test.go +++ b/pkg/sql/plan/utils_test.go @@ -19,8 +19,8 @@ import ( "net/url" "testing" - "github.com/matrixorigin/matrixone/pkg/common/stage" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" + "github.com/matrixorigin/matrixone/pkg/stage" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) diff --git a/pkg/sql/util/eval_expr_util.go b/pkg/sql/util/eval_expr_util.go index 25bc189ed194..9755edd242c3 100644 --- a/pkg/sql/util/eval_expr_util.go +++ b/pkg/sql/util/eval_expr_util.go @@ -24,12 +24,12 @@ import ( "time" "unicode/utf8" - "github.com/matrixorigin/matrixone/pkg/common/datalink" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/bytejson" "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/datalink" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/vm/process" diff --git a/pkg/stage/stage.go b/pkg/stage/stage.go new file mode 100644 index 000000000000..52636970d1be --- /dev/null +++ b/pkg/stage/stage.go @@ -0,0 +1,186 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stage + +import ( + "context" + "encoding/csv" + "net/url" + "strings" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/fileservice" +) + +const STAGE_PROTOCOL = "stage" +const S3_PROTOCOL = "s3" +const FILE_PROTOCOL = "file" + +const PARAMKEY_AWS_KEY_ID = "aws_key_id" +const PARAMKEY_AWS_SECRET_KEY = "aws_secret_key" +const PARAMKEY_AWS_REGION = "aws_region" +const PARAMKEY_ENDPOINT = "endpoint" +const PARAMKEY_COMPRESSION = "compression" +const PARAMKEY_PROVIDER = "provider" + +const S3_PROVIDER_AMAZON = "amazon" +const S3_PROVIDER_MINIO = "minio" +const S3_PROVIDER_COS = "cos" + +const S3_SERVICE = "s3" +const MINIO_SERVICE = "minio" + +type StageDef struct { + Id uint32 + Name string + Url *url.URL + Credentials map[string]string + Status string +} + +func (s StageDef) GetCredentials(key string, defval string) (string, bool) { + if s.Credentials == nil { + // no credential in this stage + return defval, false + } + + k := strings.ToLower(key) + res, ok := s.Credentials[k] + if !ok { + return defval, false + } + return res, ok +} + +// get stages and expand the path. stage may be a file or s3 +// use the format of path s3,,,,,, +// or minio,,,,,, +// expand the subpath to MO path. +// subpath is in the format like path or path with query like path?q1=v1&q2=v2... +func (s StageDef) ToPath() (mopath string, query string, err error) { + + if s.Url.Scheme == S3_PROTOCOL { + bucket, prefix, query, err := ParseS3Url(s.Url) + if err != nil { + return "", "", err + } + + // get S3 credentials + aws_key_id, found := s.GetCredentials(PARAMKEY_AWS_KEY_ID, "") + if !found { + return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: AWS_KEY_ID not found") + } + aws_secret_key, found := s.GetCredentials(PARAMKEY_AWS_SECRET_KEY, "") + if !found { + return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: AWS_SECRET_KEY not found") + } + aws_region, found := s.GetCredentials(PARAMKEY_AWS_REGION, "") + if !found { + return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: AWS_REGION not found") + } + provider, found := s.GetCredentials(PARAMKEY_PROVIDER, "") + if !found { + return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: PROVIDER not found") + } + endpoint, found := s.GetCredentials(PARAMKEY_ENDPOINT, "") + if !found { + return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: ENDPOINT not found") + } + + service, err := getS3ServiceFromProvider(provider) + if err != nil { + return "", "", err + } + + buf := new(strings.Builder) + w := csv.NewWriter(buf) + opts := []string{service, endpoint, aws_region, bucket, aws_key_id, aws_secret_key, ""} + + if err = w.Write(opts); err != nil { + return "", "", err + } + w.Flush() + return fileservice.JoinPath(buf.String(), prefix), query, nil + } else if s.Url.Scheme == FILE_PROTOCOL { + return s.Url.Path, s.Url.RawQuery, nil + } + return "", "", moerr.NewBadConfigf(context.TODO(), "URL protocol %s not supported", s.Url.Scheme) +} + +func getS3ServiceFromProvider(provider string) (string, error) { + provider = strings.ToLower(provider) + switch provider { + case S3_PROVIDER_COS: + return S3_SERVICE, nil + case S3_PROVIDER_AMAZON: + return S3_SERVICE, nil + case S3_PROVIDER_MINIO: + return MINIO_SERVICE, nil + default: + return "", moerr.NewBadConfigf(context.TODO(), "provider %s not supported", provider) + } +} + +func CredentialsToMap(cred string) (map[string]string, error) { + if len(cred) == 0 { + return nil, nil + } + + opts := strings.Split(cred, ",") + if len(opts) == 0 { + return nil, nil + } + + credentials := make(map[string]string) + for _, o := range opts { + kv := strings.SplitN(o, "=", 2) + if len(kv) != 2 { + return nil, moerr.NewBadConfig(context.TODO(), "Format error: invalid stage credentials") + } + credentials[strings.ToLower(kv[0])] = kv[1] + } + + return credentials, nil +} + +func ParseS3Url(u *url.URL) (bucket, fpath, query string, err error) { + bucket = u.Host + fpath = u.Path + query = u.RawQuery + err = nil + + if len(bucket) == 0 { + err = moerr.NewBadConfig(context.TODO(), "Invalid s3 URL: bucket is empty string") + return "", "", "", err + } + + return +} + +func ParseStageUrl(u *url.URL) (stagename, prefix, query string, err error) { + if u.Scheme != STAGE_PROTOCOL { + return "", "", "", moerr.NewBadConfig(context.TODO(), "ParseStageUrl: URL protocol is not stage://") + } + + stagename = u.Host + if len(stagename) == 0 { + return "", "", "", moerr.NewBadConfig(context.TODO(), "Invalid stage URL: stage name is empty string") + } + + prefix = u.Path + query = u.RawQuery + + return +} diff --git a/pkg/stage/stage_test.go b/pkg/stage/stage_test.go new file mode 100644 index 000000000000..0d3e82adbb53 --- /dev/null +++ b/pkg/stage/stage_test.go @@ -0,0 +1,182 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stage + +import ( + "fmt" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestS3ServiceProvider(t *testing.T) { + protocol, err := getS3ServiceFromProvider("cos") + require.Nil(t, err) + assert.Equal(t, protocol, "s3") + + protocol, err = getS3ServiceFromProvider("amazon") + require.Nil(t, err) + assert.Equal(t, protocol, "s3") + + protocol, err = getS3ServiceFromProvider("minio") + require.Nil(t, err) + assert.Equal(t, protocol, "minio") + +} + +func TestGetCredentials(t *testing.T) { + + c := make(map[string]string) + c[PARAMKEY_AWS_KEY_ID] = "key" + c[PARAMKEY_AWS_SECRET_KEY] = "secret" + c[PARAMKEY_AWS_REGION] = "region" + c[PARAMKEY_ENDPOINT] = "endpoint" + c[PARAMKEY_PROVIDER] = S3_PROVIDER_AMAZON + u, err := url.Parse("s3://bucket/path") + require.Nil(t, err) + s := StageDef{Id: 0, + Name: "mystage", + Url: u, + Credentials: c, + Status: ""} + + // key found + v, ok := s.GetCredentials(PARAMKEY_AWS_KEY_ID, "") + require.True(t, ok) + require.Equal(t, v, "key") + + // key not found and return default value + v, ok = s.GetCredentials("nokey", "default") + require.False(t, ok) + require.Equal(t, v, "default") + + s = StageDef{Id: 0, + Name: "mystage", + Url: u, + Status: ""} + + // credentials is nil and return default value + v, ok = s.GetCredentials("nokey", "default") + require.False(t, ok) + require.Equal(t, v, "default") +} + +func TestToPath(t *testing.T) { + c := make(map[string]string) + c[PARAMKEY_AWS_KEY_ID] = "key" + c[PARAMKEY_AWS_SECRET_KEY] = "secret" + c[PARAMKEY_AWS_REGION] = "region" + c[PARAMKEY_ENDPOINT] = "endpoint" + c[PARAMKEY_PROVIDER] = S3_PROVIDER_AMAZON + + // s3 path + u, err := url.Parse("s3://bucket/path/a.csv") + require.Nil(t, err) + s := StageDef{Id: 0, + Name: "mystage", + Url: u, + Credentials: c, + Status: ""} + + mopath, query, err := s.ToPath() + require.Nil(t, err) + + require.Equal(t, mopath, "s3,endpoint,region,bucket,key,secret,\n:/path/a.csv") + fmt.Printf("mo=%s, query = %s", mopath, query) + + // file path + u, err = url.Parse("file:///tmp/dir/subdir/file.pdf") + require.Nil(t, err) + s = StageDef{Id: 0, + Name: "mystage", + Url: u, + Status: ""} + + mopath, query, err = s.ToPath() + require.Nil(t, err) + require.Equal(t, query, "") + + require.Equal(t, mopath, "/tmp/dir/subdir/file.pdf") + + // invalid schema + u, err = url.Parse("https://localhost/path/file.pdf") + require.Nil(t, err) + s = StageDef{Id: 0, + Name: "mystage", + Url: u, + Status: ""} + + _, _, err = s.ToPath() + require.NotNil(t, err) +} + +func TestToPathFail(t *testing.T) { + c := make(map[string]string) + + // s3 path + u, err := url.Parse("s3://bucket/path/a.csv") + require.Nil(t, err) + s := StageDef{Id: 0, + Name: "mystage", + Url: u, + Credentials: c, + Status: ""} + + // no credentials + _, _, err = s.ToPath() + require.NotNil(t, err) + + // add key and failed with no secret key + c[PARAMKEY_AWS_KEY_ID] = "key" + _, _, err = s.ToPath() + require.NotNil(t, err) + + // add secert and failed with no region + c[PARAMKEY_AWS_SECRET_KEY] = "secret" + _, _, err = s.ToPath() + require.NotNil(t, err) + + // add region and failed with no endpoint + c[PARAMKEY_AWS_REGION] = "region" + _, _, err = s.ToPath() + require.NotNil(t, err) + + // add endpoint and failed with unknown provider + c[PARAMKEY_ENDPOINT] = "endpoint" + _, _, err = s.ToPath() + require.NotNil(t, err) + + // add unknown provider + c[PARAMKEY_PROVIDER] = "unknown" + _, _, err = s.ToPath() + require.NotNil(t, err) +} + +func TestCredentialsToMap(t *testing.T) { + + c := "aws_key_id=key,aws_secret_key=secret,aws_region=region,endpoint=ep,provider=minio" + + cmap, err := CredentialsToMap(c) + require.Nil(t, err) + require.NotNil(t, cmap) + require.Equal(t, cmap[PARAMKEY_AWS_KEY_ID], "key") + require.Equal(t, cmap[PARAMKEY_AWS_SECRET_KEY], "secret") + require.Equal(t, cmap[PARAMKEY_AWS_REGION], "region") + require.Equal(t, cmap[PARAMKEY_ENDPOINT], "ep") + require.Equal(t, cmap[PARAMKEY_PROVIDER], "minio") + +} diff --git a/pkg/common/stage/stage.go b/pkg/stage/stageutil/stageutil.go similarity index 53% rename from pkg/common/stage/stage.go rename to pkg/stage/stageutil/stageutil.go index 2d25fd6a27b6..afec4ac0c315 100644 --- a/pkg/common/stage/stage.go +++ b/pkg/stage/stageutil/stageutil.go @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package stage +package stageutil import ( "container/list" "context" - "encoding/csv" "fmt" "net/url" "path" @@ -27,139 +26,30 @@ import ( moruntime "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/stage" - //"github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/util/executor" "github.com/matrixorigin/matrixone/pkg/vm/process" ) -const STAGE_PROTOCOL = "stage" -const S3_PROTOCOL = "s3" -const FILE_PROTOCOL = "file" - -const PARAMKEY_AWS_KEY_ID = "aws_key_id" -const PARAMKEY_AWS_SECRET_KEY = "aws_secret_key" -const PARAMKEY_AWS_REGION = "aws_region" -const PARAMKEY_ENDPOINT = "endpoint" -const PARAMKEY_COMPRESSION = "compression" -const PARAMKEY_PROVIDER = "provider" - -const S3_PROVIDER_AMAZON = "amazon" -const S3_PROVIDER_MINIO = "minio" -const S3_PROVIDER_COS = "cos" - -const S3_SERVICE = "s3" -const MINIO_SERVICE = "minio" - -type StageDef struct { - Id uint32 - Name string - Url *url.URL - Credentials map[string]string - Status string -} - -func (s *StageDef) GetCredentials(key string, defval string) (string, bool) { - if s.Credentials == nil { - // no credential in this stage - return defval, false - } - - k := strings.ToLower(key) - res, ok := s.Credentials[k] - if !ok { - return defval, false - } - return res, ok -} - -func (s *StageDef) expandSubStage(proc *process.Process) (StageDef, error) { - if s.Url.Scheme == STAGE_PROTOCOL { - stagename, prefix, query, err := ParseStageUrl(s.Url) +func ExpandSubStage(s stage.StageDef, proc *process.Process) (stage.StageDef, error) { + if s.Url.Scheme == stage.STAGE_PROTOCOL { + stagename, prefix, query, err := stage.ParseStageUrl(s.Url) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } res, err := StageLoadCatalog(proc, stagename) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } res.Url = res.Url.JoinPath(prefix) res.Url.RawQuery = query - return res.expandSubStage(proc) - } - - return *s, nil -} - -// get stages and expand the path. stage may be a file or s3 -// use the format of path s3,,,,,, -// or minio,,,,,, -// expand the subpath to MO path. -// subpath is in the format like path or path with query like path?q1=v1&q2=v2... -func (s *StageDef) ToPath() (mopath string, query string, err error) { - - if s.Url.Scheme == S3_PROTOCOL { - bucket, prefix, query, err := ParseS3Url(s.Url) - if err != nil { - return "", "", err - } - - // get S3 credentials - aws_key_id, found := s.GetCredentials(PARAMKEY_AWS_KEY_ID, "") - if !found { - return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: AWS_KEY_ID not found") - } - aws_secret_key, found := s.GetCredentials(PARAMKEY_AWS_SECRET_KEY, "") - if !found { - return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: AWS_SECRET_KEY not found") - } - aws_region, found := s.GetCredentials(PARAMKEY_AWS_REGION, "") - if !found { - return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: AWS_REGION not found") - } - provider, found := s.GetCredentials(PARAMKEY_PROVIDER, "") - if !found { - return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: PROVIDER not found") - } - endpoint, found := s.GetCredentials(PARAMKEY_ENDPOINT, "") - if !found { - return "", "", moerr.NewBadConfig(context.TODO(), "Stage credentials: ENDPOINT not found") - } - - service, err := getS3ServiceFromProvider(provider) - if err != nil { - return "", "", err - } - - buf := new(strings.Builder) - w := csv.NewWriter(buf) - opts := []string{service, endpoint, aws_region, bucket, aws_key_id, aws_secret_key, ""} - - if err = w.Write(opts); err != nil { - return "", "", err - } - w.Flush() - return fileservice.JoinPath(buf.String(), prefix), query, nil - } else if s.Url.Scheme == FILE_PROTOCOL { - return s.Url.Path, s.Url.RawQuery, nil + return ExpandSubStage(res, proc) } - return "", "", moerr.NewBadConfigf(context.TODO(), "URL protocol %s not supported", s.Url.Scheme) -} -func getS3ServiceFromProvider(provider string) (string, error) { - provider = strings.ToLower(provider) - switch provider { - case S3_PROVIDER_COS: - return S3_SERVICE, nil - case S3_PROVIDER_AMAZON: - return S3_SERVICE, nil - case S3_PROVIDER_MINIO: - return MINIO_SERVICE, nil - default: - return "", moerr.NewBadConfigf(context.TODO(), "provider %s not supported", provider) - } + return s, nil } func runSql(proc *process.Process, sql string) (executor.Result, error) { @@ -179,37 +69,22 @@ func runSql(proc *process.Process, sql string) (executor.Result, error) { return exec.Exec(proc.GetTopContext(), sql, opts) } -func credentialsToMap(cred string) (map[string]string, error) { - if len(cred) == 0 { - return nil, nil - } - - opts := strings.Split(cred, ",") - if len(opts) == 0 { - return nil, nil - } +func StageLoadCatalog(proc *process.Process, stagename string) (s stage.StageDef, err error) { - credentials := make(map[string]string) - for _, o := range opts { - kv := strings.SplitN(o, "=", 2) - if len(kv) != 2 { - return nil, moerr.NewBadConfig(context.TODO(), "Format error: invalid stage credentials") - } - credentials[strings.ToLower(kv[0])] = kv[1] + cache := proc.GetStageCache() + s, ok := cache.Get(stagename) + if ok { + return s, nil } - return credentials, nil -} - -func StageLoadCatalog(proc *process.Process, stagename string) (s StageDef, err error) { getAllStagesSql := fmt.Sprintf("select stage_id, stage_name, url, stage_credentials, stage_status from `%s`.`%s` WHERE stage_name = '%s';", "mo_catalog", "mo_stages", stagename) res, err := runSql(proc, getAllStagesSql) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } defer res.Close() - var reslist []StageDef + var reslist []stage.StageDef const id_idx = 0 const name_idx = 1 const url_idx = 2 @@ -223,28 +98,29 @@ func StageLoadCatalog(proc *process.Process, stagename string) (s StageDef, err stage_name := string(batch.Vecs[name_idx].GetBytesAt(i)) stage_url, err := url.Parse(string(batch.Vecs[url_idx].GetBytesAt(i))) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } stage_cred := string(batch.Vecs[cred_idx].GetBytesAt(i)) - credmap, err := credentialsToMap(stage_cred) + credmap, err := stage.CredentialsToMap(stage_cred) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } stage_status := string(batch.Vecs[status_idx].GetBytesAt(i)) //logutil.Infof("CATALOG: ID %d, stage %s url %s cred %s", stage_id, stage_name, stage_url, stage_cred) - reslist = append(reslist, StageDef{stage_id, stage_name, stage_url, credmap, stage_status}) + reslist = append(reslist, stage.StageDef{Id: stage_id, Name: stage_name, Url: stage_url, Credentials: credmap, Status: stage_status}) } } } } if reslist == nil { - return StageDef{}, moerr.NewBadConfigf(context.TODO(), "Stage %s not found", stagename) + return stage.StageDef{}, moerr.NewBadConfigf(context.TODO(), "Stage %s not found", stagename) } + cache.Set(stagename, reslist[0]) return reslist[0], nil } @@ -258,60 +134,30 @@ func UrlToPath(furl string, proc *process.Process) (path string, query string, e return s.ToPath() } -func ParseStageUrl(u *url.URL) (stagename, prefix, query string, err error) { - if u.Scheme != STAGE_PROTOCOL { - return "", "", "", moerr.NewBadConfig(context.TODO(), "ParseStageUrl: URL protocol is not stage://") - } - - stagename = u.Host - if len(stagename) == 0 { - return "", "", "", moerr.NewBadConfig(context.TODO(), "Invalid stage URL: stage name is empty string") - } - - prefix = u.Path - query = u.RawQuery - - return -} - -func ParseS3Url(u *url.URL) (bucket, fpath, query string, err error) { - bucket = u.Host - fpath = u.Path - query = u.RawQuery - err = nil - - if len(bucket) == 0 { - err = moerr.NewBadConfig(context.TODO(), "Invalid s3 URL: bucket is empty string") - return "", "", "", err - } - - return -} - -func UrlToStageDef(furl string, proc *process.Process) (s StageDef, err error) { +func UrlToStageDef(furl string, proc *process.Process) (s stage.StageDef, err error) { aurl, err := url.Parse(furl) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } - if aurl.Scheme != STAGE_PROTOCOL { - return StageDef{}, moerr.NewBadConfig(context.TODO(), "URL is not stage URL") + if aurl.Scheme != stage.STAGE_PROTOCOL { + return stage.StageDef{}, moerr.NewBadConfig(context.TODO(), "URL is not stage URL") } - stagename, subpath, query, err := ParseStageUrl(aurl) + stagename, subpath, query, err := stage.ParseStageUrl(aurl) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } sdef, err := StageLoadCatalog(proc, stagename) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } - s, err = sdef.expandSubStage(proc) + s, err = ExpandSubStage(sdef, proc) if err != nil { - return StageDef{}, err + return stage.StageDef{}, err } s.Url = s.Url.JoinPath(subpath) diff --git a/pkg/stage/stageutil/stageutil_test.go b/pkg/stage/stageutil/stageutil_test.go new file mode 100644 index 000000000000..898c3e92fce5 --- /dev/null +++ b/pkg/stage/stageutil/stageutil_test.go @@ -0,0 +1,80 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stageutil + +import ( + "net/url" + "testing" + + "github.com/matrixorigin/matrixone/pkg/stage" + "github.com/matrixorigin/matrixone/pkg/testutil" + "github.com/stretchr/testify/require" +) + +func TestStageCache(t *testing.T) { + + proc := testutil.NewProcess() + cache := proc.GetStageCache() + + credentials := make(map[string]string) + credentials["aws_region"] = "region" + credentials["aws_id"] = "id" + credentials["aws_secret"] = "secret" + + rsu, err := url.Parse("file:///tmp") + require.Nil(t, err) + subu, err := url.Parse("stage://rsstage/substage") + require.Nil(t, err) + + cache.Set("rsstage", stage.StageDef{Id: 1, Name: "rsstage", Url: rsu, Credentials: credentials}) + cache.Set("substage", stage.StageDef{Id: 1, Name: "ftstage", Url: subu}) + + // get the final URL totally based on cache value + s, err := UrlToStageDef("stage://substage/a.csv", proc) + require.Nil(t, err) + + require.Equal(t, s.Url.String(), "file:///tmp/substage/a.csv") + require.Equal(t, s.Credentials["aws_region"], "region") + require.Equal(t, s.Credentials["aws_id"], "id") + require.Equal(t, s.Credentials["aws_secret"], "secret") + + // change the local stagedef does not change the cache + s.Url, err = url.Parse("https://localhost/path") + require.Nil(t, err) + + // preserve the cache + rs, ok := cache.Get("rsstage") + require.True(t, ok) + require.Equal(t, rs.Url.String(), "file:///tmp") + ss, ok := cache.Get("substage") + require.True(t, ok) + require.Equal(t, ss.Url.String(), "stage://rsstage/substage") +} + +func TestStageFail(t *testing.T) { + + proc := testutil.NewProcess() + _, err := UrlToStageDef("stage:///path", proc) + require.NotNil(t, err) + + u, err := url.Parse("stage:///path") + require.Nil(t, err) + s := stage.StageDef{Id: 1, Name: "rsstage", Url: u} + _, err = ExpandSubStage(s, proc) + require.NotNil(t, err) + + _, err = UrlToStageDef("not a url", proc) + require.NotNil(t, err) +} diff --git a/pkg/vm/process/process2.go b/pkg/vm/process/process2.go index 0c1062372f0a..dfd1d95dd94c 100644 --- a/pkg/vm/process/process2.go +++ b/pkg/vm/process/process2.go @@ -30,6 +30,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/logservice" "github.com/matrixorigin/matrixone/pkg/perfcounter" qclient "github.com/matrixorigin/matrixone/pkg/queryservice/client" + "github.com/matrixorigin/matrixone/pkg/stage" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/txn/util" "github.com/matrixorigin/matrixone/pkg/udf" @@ -87,6 +88,7 @@ func NewTopProcess( logger: util.GetLogger(sid), UnixTime: time.Now().UnixNano(), PostDmlSqlList: threadsafe.NewSlice[string](), + StageCache: threadsafe.NewMap[string, stage.StageDef](), } proc := &Process{ diff --git a/pkg/vm/process/types.go b/pkg/vm/process/types.go index b77eebceb1e3..c35c889712dc 100644 --- a/pkg/vm/process/types.go +++ b/pkg/vm/process/types.go @@ -21,6 +21,7 @@ import ( "sync/atomic" "time" + "github.com/matrixorigin/matrixone/pkg/stage" "github.com/matrixorigin/matrixone/pkg/vm/message" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -298,6 +299,9 @@ type BaseProcess struct { // post dml sqls run right after all pipelines finished. PostDmlSqlList *threadsafe.Slice[string] + + // stage cache to avoid to run same stage SQL repeatedly + StageCache *threadsafe.Map[string, stage.StageDef] } // Process contains context used in query execution @@ -458,6 +462,10 @@ func (proc *Process) GetPostDmlSqlList() *threadsafe.Slice[string] { return proc.Base.PostDmlSqlList } +func (proc *Process) GetStageCache() *threadsafe.Map[string, stage.StageDef] { + return proc.Base.StageCache +} + func (si *SessionInfo) GetUser() string { return si.User }