diff --git a/scripts/image_build.sh b/scripts/image_build.sh index dad1fc2ad8..65ee99cca3 100644 --- a/scripts/image_build.sh +++ b/scripts/image_build.sh @@ -99,7 +99,14 @@ if [ "${WITH_SQLFLOW_MODELS:-ON}" = "ON" ]; then rm -rf models fi -# 7. Load sqlflow Jupyter magic command automatically. c.f. https://stackoverflow.com/a/32683001. +# 7. Install odpscmd for submitting alps predict job with odps udf script +# TODO(Yancey1989): using gomaxcompute instead of the odpscmd command-line tool. +wget -q http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/119096/cn_zh/1557995455961/odpscmd_public.zip +unzip -qq odpscmd_public.zip -d /usr/local/odpscmd +ln -s /usr/local/odpscmd/bin/odpscmd /usr/local/bin/odpscmd +rm -rf odpscmd_public.zip + +# 8. Load sqlflow Jupyter magic command automatically. c.f. https://stackoverflow.com/a/32683001. mkdir -p $IPYTHON_STARTUP mkdir -p /workspace echo 'get_ipython().magic(u"%reload_ext sqlflow.magic")' >> $IPYTHON_STARTUP/00-first.py diff --git a/sql/codegen.go b/sql/codegen.go index b15dcb3426..97be3596fe 100644 --- a/sql/codegen.go +++ b/sql/codegen.go @@ -111,7 +111,10 @@ func newFiller(pr *extendedSelect, ds *trainAndValDataset, fts fieldTypes, db *D IsKerasModel: isKerasModel, }, } - for k, v := range pr.attrs { + for k, v := range pr.trainClause.trainAttrs { + r.Attrs[k] = v.String() + } + for k, v := range pr.predictClause.predAttrs { r.Attrs[k] = v.String() } diff --git a/sql/codegen_alps.go b/sql/codegen_alps.go index 8da4c06a12..32509b1d0c 100644 --- a/sql/codegen_alps.go +++ b/sql/codegen_alps.go @@ -19,6 +19,7 @@ import ( "fmt" "io/ioutil" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -28,6 +29,9 @@ import ( "sqlflow.org/gomaxcompute" ) +var alpsTrainTemplate = template.Must(template.New("alps_train").Parse(alpsTrainTemplateText)) +var alpsPredTemplate = template.Must(template.New("alps_predict").Parse(alpsPredTemplateText)) + type alpsFiller struct { // Training or Predicting IsTraining bool @@ -52,6 +56,9 @@ type alpsFiller struct { TrainClause *resolvedTrainClause ExitOnSubmit bool + // Predict + PredictUDF string + // Feature map FeatureMapTable string FeatureMapPartition string @@ -59,6 +66,11 @@ type alpsFiller struct { // ODPS OdpsConf *gomaxcompute.Config EngineCode string + + // Credential + UserID string + OSSID string + OSSKey string } type alpsFeatureColumn interface { @@ -255,13 +267,37 @@ func newALPSTrainFiller(pr *extendedSelect, db *DB, session *pb.Session, ds *tra ExitOnSubmit: exitOnSubmit}, nil } -func newALPSPredictFiller(pr *extendedSelect) (*alpsFiller, error) { - return nil, fmt.Errorf("alps predict not supported") +func newALPSPredictFiller(pr *extendedSelect, session *pb.Session) (*alpsFiller, error) { + var ossID, ossKey *expr + var ok bool + if ossID, ok = pr.predAttrs["OSS_ID"]; !ok { + return nil, fmt.Errorf("the ALPS Predict job should specify OSS_ID") + } + if ossKey, ok = pr.predAttrs["OSS_KEY"]; !ok { + return nil, fmt.Errorf("the ALPS Predict job should specify OSS_KEY") + } + modelDir := fmt.Sprintf("oss://arks-model/%s/%s.tar.gz", session.UserId, pr.predictClause.model) + + return &alpsFiller{ + IsTraining: false, + PredictInputTable: pr.tables[0], + PredictOutputTable: pr.predictClause.into, + PredictUDF: strings.Join(pr.fields.Strings(), " "), + ModelDir: modelDir, + UserID: session.UserId, + OSSID: ossID.String(), + OSSKey: ossKey.String(), + }, nil } -func submitALPS(w *PipeWriter, cwd string, filler *alpsFiller) error { +func alpsTrain(w *PipeWriter, pr *extendedSelect, db *DB, cwd string, session *pb.Session, ds *trainAndValDataset) error { var program bytes.Buffer - if err := alpsTemplate.Execute(&program, filler); err != nil { + filler, err := newALPSTrainFiller(pr, db, session, ds) + if err != nil { + return err + } + + if err = alpsTrainTemplate.Execute(&program, filler); err != nil { return fmt.Errorf("submitALPS: failed executing template: %v", err) } code := program.String() @@ -293,24 +329,54 @@ pip install http://091349.oss-cn-hangzhou-zmf.aliyuncs.com/alps/sqlflow/alps-2.0 if e := cmd.Run(); e != nil { return fmt.Errorf("code %v failed %v", code, e) } - // TODO(uuleon): save model to DB if train + // TODO(uuleon): save model to DB return nil } -func alpsTrain(w *PipeWriter, pr *extendedSelect, db *DB, cwd string, session *pb.Session, ds *trainAndValDataset) error { - f, err := newALPSTrainFiller(pr, db, session, ds) +func alpsPred(w *PipeWriter, pr *extendedSelect, db *DB, cwd string, session *pb.Session) error { + var program bytes.Buffer + filler, err := newALPSPredictFiller(pr, session) if err != nil { return err } - return submitALPS(w, cwd, f) -} + if err = alpsPredTemplate.Execute(&program, filler); err != nil { + return fmt.Errorf("submitALPS: failed executing template: %v", err) + } -func alpsPred(w *PipeWriter, pr *extendedSelect, db *DB, cwd string, session *pb.Session) error { - f, err := newALPSPredictFiller(pr) + fname := "alps_pre.odps" + filepath := filepath.Join(cwd, fname) + f, err := os.Create(filepath) if err != nil { - return err + return fmt.Errorf("Create ODPS script failed %v", err) + } + defer os.Remove(filepath) + f.WriteString(program.String()) + f.Close() + cw := &logChanWriter{wr: w} + _, ok := db.Driver().(*gomaxcompute.Driver) + if !ok { + return fmt.Errorf("Alps Predict Job only supports Maxcompute database driver") + } + cfg, err := gomaxcompute.ParseDSN(db.dataSourceName) + if err != nil { + return fmt.Errorf("Parse Maxcompute DSN failed: %v", err) + } + // FIXME(Yancey1989): using https proto. + fixedEndpoint := strings.Replace(cfg.Endpoint, "https://", "http://", 0) + // TODO(Yancey1989): submit the Maxcompute UDF script using gomaxcompute driver. + cmd := exec.Command("odpscmd", + "-u", cfg.AccessID, + "-p", cfg.AccessKey, + fmt.Sprintf("--endpoint=%s", fixedEndpoint), + fmt.Sprintf("--project=%s", cfg.Project), + "-s", filepath) + cmd.Dir = cwd + cmd.Stdout = cw + cmd.Stderr = cw + if e := cmd.Run(); e != nil { + return fmt.Errorf("submit ODPS script %s failed %v", program.String(), e) } - return submitALPS(w, cwd, f) + return nil } func (nc *numericColumn) GenerateAlpsCode(metadata *metadata) ([]string, error) { @@ -425,135 +491,6 @@ func generateAlpsFeatureColumnCode(fcs []featureColumn, metadata *metadata) ([]s return codes, nil } -const alpsTemplateText = ` -# coding: utf-8 -# Copyright (c) Antfin, Inc. All rights reserved. - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os - -import tensorflow as tf - -from alps.conf.closure import Closure -from alps.framework.train.training import build_run_config -from alps.framework.exporter import ExportStrategy -from alps.framework.exporter.arks_exporter import ArksExporter -from alps.client.base import run_experiment, submit_experiment -from alps.framework.engine import LocalEngine, YarnEngine, ResourceConf -from alps.framework.column.column import DenseColumn, SparseColumn, GroupedSparseColumn -from alps.framework.exporter.compare_fn import best_auc_fn -from alps.io import DatasetX -from alps.io.base import OdpsConf, FeatureMap -from alps.framework.experiment import EstimatorBuilder, Experiment, TrainConf, EvalConf, RuntimeConf -from alps.io.reader.odps_reader import OdpsReader - -os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # for debug usage. -#tf.logging.set_verbosity(tf.logging.INFO) - -class SQLFlowEstimatorBuilder(EstimatorBuilder): - def _build(self, experiment, run_config): -{{if ne .FeatureMapTable ""}} - feature_columns = [] - {{.FeatureColumnCode}} -{{end}} -{{if ne .ImportCode ""}} - {{.ImportCode}} -{{end}} - return {{.ModelCreatorCode}} - -if __name__ == "__main__": - odpsConf=OdpsConf( - accessid="{{.OdpsConf.AccessID}}", - accesskey="{{.OdpsConf.AccessKey}}", - endpoint="{{.OdpsConf.Endpoint}}", - project="{{.OdpsConf.Project}}" - ) - - trainDs = DatasetX( - num_epochs={{.TrainClause.Epoch}}, - batch_size={{.TrainClause.BatchSize}}, - shuffle="{{.TrainClause.EnableShuffle}}" == "true", - shuffle_buffer_size={{.TrainClause.ShuffleBufferSize}}, -{{if .TrainClause.EnableCache}} - cache_file={{.TrainClause.CachePath}}, -{{end}} - reader=OdpsReader( - odps=odpsConf, - project="{{.OdpsConf.Project}}", - table="{{.TrainInputTable}}", - # FIXME(typhoonzero): add field_names back if needed. - # field_names={{.Fields}}, - features={{.X}}, - labels={{.Y}}, -{{if ne .FeatureMapTable ""}} - feature_map=FeatureMap(table="{{.FeatureMapTable}}", -{{if ne .FeatureMapPartition ""}} - partition="{{.FeatureMapPartition}}" -{{end}} - ), - flatten_group=True -{{end}} - ), - drop_remainder="{{.TrainClause.DropRemainder}}" == "true" - ) - - evalDs = DatasetX( - num_epochs=1, - batch_size={{.TrainClause.BatchSize}}, - reader=OdpsReader( - odps=odpsConf, - project="{{.OdpsConf.Project}}", - table="{{.EvalInputTable}}", - # FIXME(typhoonzero): add field_names back if needed. - # field_names={{.Fields}}, - features={{.X}}, - labels={{.Y}}, - flatten_group=True - ) - ) - - export_path = "{{.ModelDir}}" -{{if ne .ScratchDir ""}} - runtime_conf = RuntimeConf(model_dir="{{.ScratchDir}}") -{{else}} - runtime_conf = None -{{end}} - experiment = Experiment( - user="shangchun.sun", # TODO(joyyoj) pai will check user name be a valid user, removed later. - engine={{.EngineCode}}, - train=TrainConf(input=trainDs, -{{if (ne .TrainClause.MaxSteps -1)}} - max_steps={{.TrainClause.MaxSteps}}, -{{end}} - ), - eval=EvalConf(input=evalDs, - # FIXME(typhoonzero): Support configure metrics - metrics_set=['accuracy'], -{{if (ne .TrainClause.EvalSteps -1)}} - steps={{.TrainClause.EvalSteps}}, -{{end}} - start_delay_secs={{.TrainClause.EvalStartDelay}}, - throttle_secs={{.TrainClause.EvalThrottle}}, - ), - # FIXME(typhoonzero): Use ExportStrategy.BEST when possible. - exporter=ArksExporter(deploy_path=export_path, strategy=ExportStrategy.LATEST, compare_fn=Closure(best_auc_fn)), - runtime = runtime_conf, - model_builder=SQLFlowEstimatorBuilder()) - - if isinstance(experiment.engine, LocalEngine): - run_experiment(experiment) - else: - if "{{.ExitOnSubmit}}" == "false": - submit_experiment(experiment) - else: - submit_experiment(experiment, exit_on_submit=True) -` - -var alpsTemplate = template.Must(template.New("alps").Parse(alpsTemplateText)) - type metadata struct { odpsConfig *gomaxcompute.Config table string diff --git a/sql/executor.go b/sql/executor.go index a7f11941ec..960d8431d9 100644 --- a/sql/executor.go +++ b/sql/executor.go @@ -66,7 +66,8 @@ func splitExtendedSQL(slct string) []string { } for i := 1; i < len(typ)-2; i++ { if (typ[i] == TRAIN && typ[i+1] == IDENT && typ[i+2] == WITH) || - (typ[i] == PREDICT && typ[i+1] == IDENT && typ[i+2] == USING) { + (typ[i] == PREDICT && typ[i+1] == IDENT && typ[i+2] == USING) || + (typ[i] == PREDICT && typ[i+1] == IDENT && typ[i+2] == WITH) { return []string{slct[:pos[i-1]], slct[pos[i-1]:]} } } @@ -243,7 +244,6 @@ func runExtendedSQL(slct string, db *DB, modelDir string, session *pb.Session) * defer func(startAt time.Time) { log.Debugf("runExtendedSQL %v finished, elapsed:%v", slct, time.Since(startAt)) }(time.Now()) - pr, e := newParser().Parse(slct) if e != nil { return e diff --git a/sql/expression_resolver.go b/sql/expression_resolver.go index 6b6c0d3664..4616b1bb9d 100644 --- a/sql/expression_resolver.go +++ b/sql/expression_resolver.go @@ -191,7 +191,7 @@ func getEngineSpec(attrs map[string]*attribute) engineSpec { func resolveTrainClause(tc *trainClause) (*resolvedTrainClause, error) { modelName := tc.estimator preMadeModel := !strings.ContainsAny(modelName, ".") - attrs, err := resolveTrainAttribute(&tc.attrs) + attrs, err := resolveTrainAttribute(&tc.trainAttrs) if err != nil { return nil, err } diff --git a/sql/expression_resolver_test.go b/sql/expression_resolver_test.go index e2849c9e25..99f1825c73 100644 --- a/sql/expression_resolver_test.go +++ b/sql/expression_resolver_test.go @@ -262,7 +262,7 @@ func TestAttrs(t *testing.T) { s := statementWithAttrs("estimator.hidden_units = [10, 20]") r, e := parser.Parse(s) a.NoError(e) - attrs, err := resolveTrainAttribute(&r.attrs) + attrs, err := resolveTrainAttribute(&r.trainAttrs) a.NoError(err) attr := attrs["estimator.hidden_units"] a.Equal("estimator", attr.Prefix) @@ -272,7 +272,7 @@ func TestAttrs(t *testing.T) { s = statementWithAttrs("dataset.name = hello") r, e = parser.Parse(s) a.NoError(e) - attrs, err = resolveTrainAttribute(&r.attrs) + attrs, err = resolveTrainAttribute(&r.trainAttrs) a.NoError(err) attr = attrs["dataset.name"] a.Equal("dataset", attr.Prefix) @@ -286,7 +286,7 @@ func TestExecResource(t *testing.T) { s := statementWithAttrs("exec.worker_num = 2") r, e := parser.Parse(s) a.NoError(e) - attrs, err := resolveTrainAttribute(&r.attrs) + attrs, err := resolveTrainAttribute(&r.trainAttrs) a.NoError(err) attr := attrs["exec.worker_num"] fmt.Println(attr) diff --git a/sql/parser.go b/sql/parser.go index 1b7c3e02c0..66efa90aa9 100644 --- a/sql/parser.go +++ b/sql/parser.go @@ -90,11 +90,11 @@ type standardSelect struct { } type trainClause struct { - estimator string - attrs attrs - columns columnClause - label string - save string + estimator string + trainAttrs attrs + columns columnClause + label string + save string } /* If no FOR in the COLUMN, the key is "" */ @@ -104,8 +104,9 @@ type filedClause exprlist type attrs map[string]*expr type predictClause struct { - model string - into string + predAttrs attrs + model string + into string } var parseResult *extendedSelect @@ -120,7 +121,7 @@ func attrsUnion(as1, as2 attrs) attrs { return as1 } -//line sql.y:105 +//line sql.y:106 type sqlSymType struct { yys int val string /* NUMBER, IDENT, STRING, and keywords */ @@ -210,7 +211,7 @@ const sqlEofCode = 1 const sqlErrCode = 2 const sqlInitialStackSize = 16 -//line sql.y:280 +//line sql.y:279 /* Like Lisp's builtin function cdr. */ func (e *expr) cdr() (r []string) { @@ -326,65 +327,65 @@ var sqlExca = [...]int{ const sqlPrivate = 57344 -const sqlLast = 164 +const sqlLast = 169 var sqlAct = [...]int{ - 29, 98, 57, 97, 13, 79, 82, 80, 110, 78, - 92, 79, 22, 38, 90, 36, 56, 37, 49, 50, + 29, 101, 57, 100, 13, 83, 82, 80, 79, 81, + 95, 80, 22, 92, 114, 111, 56, 38, 49, 50, 46, 45, 44, 48, 47, 39, 40, 41, 42, 43, - 51, 108, 107, 53, 54, 77, 7, 9, 8, 10, - 11, 89, 65, 66, 67, 68, 69, 70, 71, 72, - 73, 74, 75, 76, 63, 108, 24, 23, 25, 86, - 79, 60, 18, 17, 91, 52, 4, 31, 24, 23, - 25, 30, 41, 42, 43, 27, 62, 85, 32, 31, - 28, 100, 87, 30, 105, 15, 106, 27, 21, 113, - 32, 101, 28, 99, 102, 101, 96, 16, 104, 111, - 24, 23, 25, 39, 40, 41, 42, 43, 109, 101, - 112, 31, 83, 84, 64, 30, 61, 34, 33, 27, - 20, 103, 32, 55, 28, 49, 50, 46, 45, 44, - 48, 47, 39, 40, 41, 42, 43, 46, 45, 44, - 48, 47, 39, 40, 41, 42, 43, 35, 94, 95, - 58, 59, 3, 81, 12, 26, 19, 14, 6, 93, - 88, 5, 2, 1, + 51, 88, 80, 53, 54, 78, 18, 112, 112, 94, + 91, 36, 66, 67, 68, 69, 70, 71, 72, 73, + 74, 75, 76, 77, 64, 24, 23, 25, 7, 9, + 8, 10, 11, 37, 91, 61, 31, 86, 17, 93, + 30, 41, 42, 43, 27, 63, 52, 32, 87, 28, + 24, 23, 25, 89, 109, 117, 110, 21, 4, 115, + 113, 31, 106, 104, 84, 30, 105, 99, 104, 27, + 85, 108, 32, 55, 28, 24, 23, 25, 39, 40, + 41, 42, 43, 104, 116, 65, 31, 62, 34, 33, + 30, 20, 35, 107, 27, 60, 58, 32, 59, 28, + 49, 50, 46, 45, 44, 48, 47, 39, 40, 41, + 42, 43, 46, 45, 44, 48, 47, 39, 40, 41, + 42, 43, 103, 15, 97, 98, 3, 12, 26, 19, + 14, 6, 96, 90, 102, 16, 5, 2, 1, } var sqlPact = [...]int{ - 148, -1000, 31, 68, -1000, 28, 27, 103, 70, 51, - 101, 100, -1000, 131, -23, -19, -1000, -1000, -1000, -25, - -1000, -1000, 105, -1000, -19, -1000, -1000, 51, 46, -1000, - 51, 51, 83, 140, 138, 25, 99, 39, 97, 51, - 51, 51, 51, 51, 51, 51, 51, 51, 51, 51, - 51, -2, -32, -1000, -1000, -1000, -33, 105, 95, 96, - 51, -1000, -1000, 22, -1000, 43, 43, -1000, -1000, -1000, - 76, 76, 76, 76, 76, 115, 115, -1000, -1000, 51, - -1000, 3, -1000, 40, -1000, -27, -1000, 105, 137, 95, - 64, 51, -1000, 107, 64, 67, -1000, 17, -1000, -1000, - -19, -1000, 105, 91, -7, -1000, -1000, 82, 64, -1000, - 72, -1000, -1000, -1000, + 152, -1000, 53, 136, -1000, 33, 1, 104, 69, 88, + 102, 101, -1000, 106, 3, 27, -1000, -1000, -1000, -21, + -1000, -1000, 110, -1000, 27, -1000, -1000, 88, 57, -1000, + 88, 88, 63, 116, 115, 29, 100, 38, 98, 88, + 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, + 88, -2, -33, -1000, -1000, -1000, -31, 110, 77, 83, + 77, 88, -1000, -1000, -6, -1000, 42, 42, -1000, -1000, + -1000, 81, 81, 81, 81, 81, 120, 120, -1000, -1000, + 88, -1000, 2, -1000, 45, -1000, 26, -27, -1000, 110, + 143, 77, 135, 88, 75, -1000, 109, 135, 67, -1000, + 0, -1000, -1000, 27, -1000, 110, -1000, 73, -1, -1000, + -1000, 72, 135, -1000, 68, -1000, -1000, -1000, } var sqlPgo = [...]int{ - 0, 163, 162, 161, 160, 159, 158, 157, 156, 2, - 0, 1, 16, 155, 3, 154, 6, 153, + 0, 168, 167, 166, 163, 162, 161, 160, 159, 2, + 0, 1, 16, 158, 3, 157, 5, 6, } var sqlR1 = [...]int{ 0, 1, 1, 1, 2, 2, 2, 2, 3, 6, - 4, 4, 4, 15, 15, 7, 7, 7, 11, 11, - 11, 14, 14, 5, 5, 8, 8, 16, 17, 17, - 10, 10, 12, 12, 13, 13, 9, 9, 9, 9, + 6, 4, 4, 4, 15, 15, 7, 7, 7, 11, + 11, 11, 14, 14, 5, 5, 8, 8, 16, 17, + 17, 10, 10, 12, 12, 13, 13, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, - 9, 9, 9, 9, 9, 9, 9, + 9, 9, 9, 9, 9, 9, 9, 9, } var sqlR2 = [...]int{ 0, 2, 3, 3, 2, 3, 3, 3, 8, 4, - 2, 4, 5, 5, 1, 1, 1, 3, 1, 1, - 1, 1, 3, 2, 2, 1, 3, 3, 1, 3, - 3, 4, 1, 3, 2, 3, 1, 1, 1, 1, - 3, 3, 1, 3, 3, 3, 3, 3, 3, 3, - 3, 3, 3, 3, 3, 2, 2, + 6, 2, 4, 5, 5, 1, 1, 1, 3, 1, + 1, 1, 1, 3, 2, 2, 1, 3, 3, 1, + 3, 3, 4, 1, 3, 2, 3, 1, 1, 1, + 1, 3, 3, 1, 3, 3, 3, 3, 3, 3, + 3, 3, 3, 3, 3, 3, 2, 2, } var sqlChk = [...]int{ @@ -394,27 +395,27 @@ var sqlChk = [...]int{ 32, 28, 39, 17, 17, 16, 38, 36, 38, 27, 28, 29, 30, 31, 24, 23, 22, 26, 25, 20, 21, -9, 19, -9, -9, 40, -12, -9, 10, 13, - 36, 17, 37, -12, 17, -9, -9, -9, -9, -9, - -9, -9, -9, -9, -9, -9, -9, 37, 41, 38, - 40, -17, -16, 17, 17, -12, 37, -9, -4, 38, - 11, 24, 37, -5, 11, 12, -16, -14, -11, 29, - 17, -10, -9, 14, -14, 17, 19, 15, 38, 17, - 15, 17, -11, 17, + 10, 36, 17, 37, -12, 17, -9, -9, -9, -9, + -9, -9, -9, -9, -9, -9, -9, -9, 37, 41, + 38, 40, -17, -16, 17, 17, -17, -12, 37, -9, + -4, 38, 11, 24, 13, 37, -5, 11, 12, -16, + -14, -11, 29, 17, -10, -9, 17, 14, -14, 17, + 19, 15, 38, 17, 15, 17, -11, 17, } var sqlDef = [...]int{ 0, -2, 0, 0, 1, 0, 0, 0, 0, 0, - 0, 0, 4, 0, 14, 16, 15, 2, 3, 5, - 25, 6, 7, 36, 37, 38, 39, 0, 0, 42, + 0, 0, 4, 0, 15, 17, 16, 2, 3, 5, + 26, 6, 7, 37, 38, 39, 40, 0, 0, 43, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 55, 56, 34, 0, 32, 0, 0, - 0, 17, 30, 0, 26, 43, 44, 45, 46, 47, - 48, 49, 50, 51, 52, 53, 54, 40, 41, 0, - 35, 0, 28, 0, 9, 0, 31, 33, 0, 0, - 0, 0, 13, 0, 0, 0, 29, 10, 21, 18, - 19, 20, 27, 0, 0, 23, 24, 0, 0, 8, - 0, 11, 22, 12, + 0, 0, 0, 56, 57, 35, 0, 33, 0, 0, + 0, 0, 18, 31, 0, 27, 44, 45, 46, 47, + 48, 49, 50, 51, 52, 53, 54, 55, 41, 42, + 0, 36, 0, 29, 0, 9, 0, 0, 32, 34, + 0, 0, 0, 0, 0, 14, 0, 0, 0, 30, + 11, 22, 19, 20, 21, 28, 10, 0, 0, 24, + 25, 0, 0, 8, 0, 12, 23, 13, } var sqlTok1 = [...]int{ @@ -778,7 +779,7 @@ sqldefault: case 1: sqlDollar = sqlS[sqlpt-2 : sqlpt+1] -//line sql.y:147 +//line sql.y:148 { parseResult = &extendedSelect{ extended: false, @@ -786,7 +787,7 @@ sqldefault: } case 2: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:152 +//line sql.y:153 { parseResult = &extendedSelect{ extended: true, @@ -796,7 +797,7 @@ sqldefault: } case 3: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:159 +//line sql.y:160 { parseResult = &extendedSelect{ extended: true, @@ -806,324 +807,332 @@ sqldefault: } case 4: sqlDollar = sqlS[sqlpt-2 : sqlpt+1] -//line sql.y:169 +//line sql.y:170 { sqlVAL.slct.fields = sqlDollar[2].expl } case 5: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:170 +//line sql.y:171 { sqlVAL.slct.tables = sqlDollar[3].tbls } case 6: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:171 +//line sql.y:172 { sqlVAL.slct.limit = sqlDollar[3].val } case 7: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:172 +//line sql.y:173 { sqlVAL.slct.where = sqlDollar[3].expr } case 8: sqlDollar = sqlS[sqlpt-8 : sqlpt+1] -//line sql.y:176 +//line sql.y:177 { sqlVAL.tran.estimator = sqlDollar[2].val - sqlVAL.tran.attrs = sqlDollar[4].atrs + sqlVAL.tran.trainAttrs = sqlDollar[4].atrs sqlVAL.tran.columns = sqlDollar[5].colc sqlVAL.tran.label = sqlDollar[6].labc sqlVAL.tran.save = sqlDollar[8].val } case 9: sqlDollar = sqlS[sqlpt-4 : sqlpt+1] -//line sql.y:186 +//line sql.y:187 { sqlVAL.infr.into = sqlDollar[2].val sqlVAL.infr.model = sqlDollar[4].val } case 10: + sqlDollar = sqlS[sqlpt-6 : sqlpt+1] +//line sql.y:188 + { + sqlVAL.infr.into = sqlDollar[2].val + sqlVAL.infr.predAttrs = sqlDollar[4].atrs + sqlVAL.infr.model = sqlDollar[6].val + } + case 11: sqlDollar = sqlS[sqlpt-2 : sqlpt+1] -//line sql.y:193 +//line sql.y:192 { sqlVAL.colc = map[string]exprlist{"feature_columns": sqlDollar[2].expl} } - case 11: + case 12: sqlDollar = sqlS[sqlpt-4 : sqlpt+1] -//line sql.y:194 +//line sql.y:193 { sqlVAL.colc = map[string]exprlist{sqlDollar[4].val: sqlDollar[2].expl} } - case 12: + case 13: sqlDollar = sqlS[sqlpt-5 : sqlpt+1] -//line sql.y:195 +//line sql.y:194 { sqlVAL.colc[sqlDollar[5].val] = sqlDollar[3].expl } - case 13: + case 14: sqlDollar = sqlS[sqlpt-5 : sqlpt+1] -//line sql.y:199 +//line sql.y:198 { sqlVAL.expl = exprlist{sqlDollar[1].expr, atomic(IDENT, "AS"), funcall("", sqlDollar[4].expl)} } - case 14: + case 15: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:202 +//line sql.y:201 { sqlVAL.expl = sqlDollar[1].flds } - case 15: + case 16: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:206 +//line sql.y:205 { sqlVAL.flds = append(sqlVAL.flds, atomic(IDENT, "*")) } - case 16: + case 17: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:207 +//line sql.y:206 { sqlVAL.flds = append(sqlVAL.flds, atomic(IDENT, sqlDollar[1].val)) } - case 17: + case 18: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:208 +//line sql.y:207 { sqlVAL.flds = append(sqlDollar[1].flds, atomic(IDENT, sqlDollar[3].val)) } - case 18: + case 19: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:212 +//line sql.y:211 { sqlVAL.expr = atomic(IDENT, "*") } - case 19: + case 20: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:213 +//line sql.y:212 { sqlVAL.expr = atomic(IDENT, sqlDollar[1].val) } - case 20: + case 21: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:214 +//line sql.y:213 { sqlVAL.expr = sqlDollar[1].expr } - case 21: + case 22: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:218 +//line sql.y:217 { sqlVAL.expl = exprlist{sqlDollar[1].expr} } - case 22: + case 23: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:219 +//line sql.y:218 { sqlVAL.expl = append(sqlDollar[1].expl, sqlDollar[3].expr) } - case 23: + case 24: sqlDollar = sqlS[sqlpt-2 : sqlpt+1] -//line sql.y:223 +//line sql.y:222 { sqlVAL.labc = sqlDollar[2].val } - case 24: + case 25: sqlDollar = sqlS[sqlpt-2 : sqlpt+1] -//line sql.y:224 +//line sql.y:223 { sqlVAL.labc = sqlDollar[2].val[1 : len(sqlDollar[2].val)-1] } - case 25: + case 26: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:228 +//line sql.y:227 { sqlVAL.tbls = []string{sqlDollar[1].val} } - case 26: + case 27: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:229 +//line sql.y:228 { sqlVAL.tbls = append(sqlDollar[1].tbls, sqlDollar[3].val) } - case 27: + case 28: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:233 +//line sql.y:232 { sqlVAL.atrs = attrs{sqlDollar[1].val: sqlDollar[3].expr} } - case 28: + case 29: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:237 +//line sql.y:236 { sqlVAL.atrs = sqlDollar[1].atrs } - case 29: + case 30: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:238 +//line sql.y:237 { sqlVAL.atrs = attrsUnion(sqlDollar[1].atrs, sqlDollar[3].atrs) } - case 30: + case 31: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:242 +//line sql.y:241 { sqlVAL.expr = funcall(sqlDollar[1].val, nil) } - case 31: + case 32: sqlDollar = sqlS[sqlpt-4 : sqlpt+1] -//line sql.y:243 +//line sql.y:242 { sqlVAL.expr = funcall(sqlDollar[1].val, sqlDollar[3].expl) } - case 32: + case 33: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:247 +//line sql.y:246 { sqlVAL.expl = exprlist{sqlDollar[1].expr} } - case 33: + case 34: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:248 +//line sql.y:247 { sqlVAL.expl = append(sqlDollar[1].expl, sqlDollar[3].expr) } - case 34: + case 35: sqlDollar = sqlS[sqlpt-2 : sqlpt+1] -//line sql.y:252 +//line sql.y:251 { sqlVAL.expl = nil } - case 35: + case 36: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:253 +//line sql.y:252 { sqlVAL.expl = sqlDollar[2].expl } - case 36: + case 37: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:257 +//line sql.y:256 { sqlVAL.expr = atomic(NUMBER, sqlDollar[1].val) } - case 37: + case 38: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:258 +//line sql.y:257 { sqlVAL.expr = atomic(IDENT, sqlDollar[1].val) } - case 38: + case 39: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:259 +//line sql.y:258 { sqlVAL.expr = atomic(STRING, sqlDollar[1].val) } - case 39: + case 40: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:260 +//line sql.y:259 { sqlVAL.expr = variadic('[', "square", sqlDollar[1].expl) } - case 40: + case 41: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:261 +//line sql.y:260 { sqlVAL.expr = unary('(', "paren", sqlDollar[2].expr) } - case 41: + case 42: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:262 +//line sql.y:261 { sqlVAL.expr = unary('"', "quota", atomic(STRING, sqlDollar[2].val)) } - case 42: + case 43: sqlDollar = sqlS[sqlpt-1 : sqlpt+1] -//line sql.y:263 +//line sql.y:262 { sqlVAL.expr = sqlDollar[1].expr } - case 43: + case 44: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:264 +//line sql.y:263 { sqlVAL.expr = binary('+', sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 44: + case 45: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:265 +//line sql.y:264 { sqlVAL.expr = binary('-', sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 45: + case 46: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:266 +//line sql.y:265 { sqlVAL.expr = binary('*', sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 46: + case 47: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:267 +//line sql.y:266 { sqlVAL.expr = binary('/', sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 47: + case 48: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:268 +//line sql.y:267 { sqlVAL.expr = binary('%', sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 48: + case 49: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:269 +//line sql.y:268 { sqlVAL.expr = binary('=', sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 49: + case 50: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:270 +//line sql.y:269 { sqlVAL.expr = binary('<', sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 50: + case 51: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:271 +//line sql.y:270 { sqlVAL.expr = binary('>', sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 51: + case 52: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:272 +//line sql.y:271 { sqlVAL.expr = binary(LE, sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 52: + case 53: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:273 +//line sql.y:272 { sqlVAL.expr = binary(GE, sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 53: + case 54: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:274 +//line sql.y:273 { sqlVAL.expr = binary(AND, sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 54: + case 55: sqlDollar = sqlS[sqlpt-3 : sqlpt+1] -//line sql.y:275 +//line sql.y:274 { sqlVAL.expr = binary(OR, sqlDollar[1].expr, sqlDollar[2].val, sqlDollar[3].expr) } - case 55: + case 56: sqlDollar = sqlS[sqlpt-2 : sqlpt+1] -//line sql.y:276 +//line sql.y:275 { sqlVAL.expr = unary(NOT, sqlDollar[1].val, sqlDollar[2].expr) } - case 56: + case 57: sqlDollar = sqlS[sqlpt-2 : sqlpt+1] -//line sql.y:277 +//line sql.y:276 { sqlVAL.expr = unary('-', sqlDollar[1].val, sqlDollar[2].expr) } diff --git a/sql/parser_test.go b/sql/parser_test.go index 3a01a6ef83..20a5c48f16 100644 --- a/sql/parser_test.go +++ b/sql/parser_test.go @@ -55,6 +55,13 @@ INTO sqlflow_models.my_dnn_model; ` testPredictSelect = testStandardSelectStmt + `PREDICT db.table.field USING sqlflow_models.my_dnn_model;` + + testMaxcomputeUDFPredict = ` +SELECT predict_fun(concat(",", col_1, col_2)) AS (info, score) FROM db.table +PREDICT db.predict_result +WITH OSS_KEY=a, OSS_ID=b +USING sqlflow_models.my_model; + ` ) func TestStandardSelect(t *testing.T) { @@ -81,8 +88,8 @@ func TestTrainParser(t *testing.T) { a.True(r.extended) a.True(r.train) a.Equal("DNNClassifier", r.estimator) - a.Equal("[10, 20]", r.attrs["hidden_units"].String()) - a.Equal("3", r.attrs["n_classes"].String()) + a.Equal("[10, 20]", r.trainAttrs["hidden_units"].String()) + a.Equal("3", r.trainAttrs["n_classes"].String()) a.Equal(`employee.name`, r.columns["feature_columns"][0].String()) a.Equal(`bucketize(last_name, 1000)`, @@ -101,8 +108,8 @@ func TestMultiColumnTrainParser(t *testing.T) { a.True(r.extended) a.True(r.train) a.Equal("DNNClassifier", r.estimator) - a.Equal("[10, 20]", r.attrs["hidden_units"].String()) - a.Equal("3", r.attrs["n_classes"].String()) + a.Equal("[10, 20]", r.trainAttrs["hidden_units"].String()) + a.Equal("3", r.trainAttrs["n_classes"].String()) a.Equal(`employee.name`, r.columns["feature_columns"][0].String()) a.Equal(`bucketize(last_name, 1000)`, @@ -148,13 +155,14 @@ func TestStandardDropTable(t *testing.T) { func TestSelectMaxcomputeUDF(t *testing.T) { a := assert.New(t) - slct := "SELECT func(func2(\"arg0\", arg1), arg_2) AS (info, score) FROM a_table where a_table.col_1 > 100;" - pr, _ := newParser().Parse(slct) - expFields := []string{ - "func(func2(\"arg0\", arg1), arg_2)", - "AS", - "(info, score)", - } - a.Equal(pr.fields.Strings(), expFields) - a.Equal(pr.tables[0], "a_table") + r, e := newParser().Parse(testMaxcomputeUDFPredict) + a.NoError(e) + a.Equal(3, len(r.fields.Strings())) + a.Equal(r.fields[0].String(), `predict_fun(concat(",", col_1, col_2))`) + a.Equal(r.fields[1].String(), `AS`) + a.Equal(r.fields[2].String(), `(info, score)`) + a.Equal(r.predictClause.into, "db.predict_result") + a.Equal(r.predAttrs["OSS_KEY"].String(), "a") + a.Equal(r.predAttrs["OSS_ID"].String(), "b") + a.Equal(r.predictClause.model, "sqlflow_models.my_model") } diff --git a/sql/sql.y b/sql/sql.y index 13732b62af..740c2c41c1 100644 --- a/sql/sql.y +++ b/sql/sql.y @@ -72,7 +72,7 @@ type trainClause struct { estimator string - attrs attrs + trainAttrs attrs columns columnClause label string save string @@ -85,6 +85,7 @@ type attrs map[string]*expr type predictClause struct { + predAttrs attrs model string into string } @@ -175,7 +176,7 @@ select train_clause : TRAIN IDENT WITH attrs column_clause label_clause INTO IDENT { $$.estimator = $2 - $$.attrs = $4 + $$.trainAttrs = $4 $$.columns = $5 $$.label = $6 $$.save = $8 @@ -183,10 +184,8 @@ train_clause ; predict_clause -: PREDICT IDENT USING IDENT { - $$.into = $2 - $$.model = $4 -} +: PREDICT IDENT USING IDENT { $$.into = $2; $$.model = $4 } +| PREDICT IDENT WITH attrs USING IDENT { $$.into = $2; $$.predAttrs = $4; $$.model = $6 } ; column_clause diff --git a/sql/template_alps.go b/sql/template_alps.go new file mode 100644 index 0000000000..4d269e789b --- /dev/null +++ b/sql/template_alps.go @@ -0,0 +1,164 @@ +// Copyright 2019 The SQLFlow Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +const alpsTrainTemplateText = ` +# coding: utf-8 +# Copyright (c) Antfin, Inc. All rights reserved. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os + +import tensorflow as tf + +from alps.conf.closure import Closure +from alps.framework.train.training import build_run_config +from alps.framework.exporter import ExportStrategy +from alps.framework.exporter.arks_exporter import ArksExporter +from alps.client.base import run_experiment, submit_experiment +from alps.framework.engine import LocalEngine, YarnEngine, ResourceConf +from alps.framework.column.column import DenseColumn, SparseColumn, GroupedSparseColumn +from alps.framework.exporter.compare_fn import best_auc_fn +from alps.io import DatasetX +from alps.io.base import OdpsConf, FeatureMap +from alps.framework.experiment import EstimatorBuilder, Experiment, TrainConf, EvalConf, RuntimeConf +from alps.io.reader.odps_reader import OdpsReader + +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # for debug usage. +#tf.logging.set_verbosity(tf.logging.INFO) + +class SQLFlowEstimatorBuilder(EstimatorBuilder): + def _build(self, experiment, run_config): +{{if ne .FeatureMapTable ""}} + feature_columns = [] + {{.FeatureColumnCode}} +{{end}} +{{if ne .ImportCode ""}} + {{.ImportCode}} +{{end}} + return {{.ModelCreatorCode}} + +if __name__ == "__main__": + odpsConf=OdpsConf( + accessid="{{.OdpsConf.AccessID}}", + accesskey="{{.OdpsConf.AccessKey}}", + endpoint="{{.OdpsConf.Endpoint}}", + project="{{.OdpsConf.Project}}" + ) + + trainDs = DatasetX( + num_epochs={{.TrainClause.Epoch}}, + batch_size={{.TrainClause.BatchSize}}, + shuffle="{{.TrainClause.EnableShuffle}}" == "true", + shuffle_buffer_size={{.TrainClause.ShuffleBufferSize}}, +{{if .TrainClause.EnableCache}} + cache_file={{.TrainClause.CachePath}}, +{{end}} + reader=OdpsReader( + odps=odpsConf, + project="{{.OdpsConf.Project}}", + table="{{.TrainInputTable}}", + # FIXME(typhoonzero): add field_names back if needed. + # field_names={{.Fields}}, + features={{.X}}, + labels={{.Y}}, +{{if ne .FeatureMapTable ""}} + feature_map=FeatureMap(table="{{.FeatureMapTable}}", +{{if ne .FeatureMapPartition ""}} + partition="{{.FeatureMapPartition}}" +{{end}} + ), + flatten_group=True +{{end}} + ), + drop_remainder="{{.TrainClause.DropRemainder}}" == "true" + ) + + evalDs = DatasetX( + num_epochs=1, + batch_size={{.TrainClause.BatchSize}}, + reader=OdpsReader( + odps=odpsConf, + project="{{.OdpsConf.Project}}", + table="{{.EvalInputTable}}", + # FIXME(typhoonzero): add field_names back if needed. + # field_names={{.Fields}}, + features={{.X}}, + labels={{.Y}}, + flatten_group=True + ) + ) + + export_path = "{{.ModelDir}}" +{{if ne .ScratchDir ""}} + runtime_conf = RuntimeConf(model_dir="{{.ScratchDir}}") +{{else}} + runtime_conf = None +{{end}} + experiment = Experiment( + user="shangchun.sun", # TODO(joyyoj) pai will check user name be a valid user, removed later. + engine={{.EngineCode}}, + train=TrainConf(input=trainDs, +{{if (ne .TrainClause.MaxSteps -1)}} + max_steps={{.TrainClause.MaxSteps}}, +{{end}} + ), + eval=EvalConf(input=evalDs, + # FIXME(typhoonzero): Support configure metrics + metrics_set=['accuracy'], +{{if (ne .TrainClause.EvalSteps -1)}} + steps={{.TrainClause.EvalSteps}}, +{{end}} + start_delay_secs={{.TrainClause.EvalStartDelay}}, + throttle_secs={{.TrainClause.EvalThrottle}}, + ), + # FIXME(typhoonzero): Use ExportStrategy.BEST when possible. + exporter=ArksExporter(deploy_path=export_path, strategy=ExportStrategy.LATEST, compare_fn=Closure(best_auc_fn)), + runtime = runtime_conf, + model_builder=SQLFlowEstimatorBuilder()) + + if isinstance(experiment.engine, LocalEngine): + run_experiment(experiment) + else: + if "{{.ExitOnSubmit}}" == "false": + run_experiment(experiment) + else: + submit_experiment(experiment, exit_on_submit=True) +` + +const alpsPredTemplateText = ` +set odps.task.major.version=default; +set odps.isolation.session.enable=true; +set odps.service.mode=off; +set odps.instance.priority = 0; +set odps.sql.udf.timeout = 3000; + +set mst.model.path={{.ModelDir}}; +set mst.model.name={{.PredictInputModel}}; +set mst.oss.id={{.OSSID}}; +set mst.oss.key={{.OSSKey}}; +set mst.load.feature_map=false; + +set deepbreath.sparse.group.separator=:; +set deepbreath.sparse.separator=,; +set deepbreath.enable.sigmoid=false; +set odps.sql.mapper.split.size=64; + +set alps.custom.output=predictions; + +CREATE TABLE IF NOT EXISTS {{.PredictOutputTable}} AS SELECT {{.PredictUDF}} FROM {{.PredictInputTable}}; +`