diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java index 2a199263b..b06d94dd0 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java @@ -16,6 +16,9 @@ package io.cdap.wrangler.api; +import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.parser.UsageDefinition; import java.util.List; @@ -51,7 +54,8 @@ * } * */ -public interface Directive extends Executor, List>, EntityMetrics { +public interface Directive extends Executor, List>, EntityMetrics, + DirectiveRelationalTransform { /** * This defines a interface variable that is static and final for specify * the {@code type} of the plugin this interface would provide. @@ -126,4 +130,5 @@ default List getCountMetrics() { // no op return null; } + } diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java new file mode 100644 index 000000000..80f92dd05 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.api; + +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; +import io.cdap.cdap.etl.api.relational.RelationalTransform; + +/** + * {@link DirectiveRelationalTransform} provides relational transform support for + * wrangler directives. + */ +public interface DirectiveRelationalTransform extends LinearRelationalTransform { + + /** + * Implementation of linear relational transform for each supported directive. + * + * @param relationalTranformContext transformation context with engine, input and output parameters + * @param relation input relation upon which the transformation is applied. + * @return transformed relation as the output relation. By default, returns an Invalid relation + * for unsupported directives. + */ + default Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + return new InvalidRelation("SQL execution for the directive is currently not supported."); + } + + /** + * Indicates whether the directive is supported by relational transformation or not. + * + * @return boolean value for the directive SQL support. + * By default, returns false, indicating that the directive is currently not supported. + */ + default boolean isSQLSupported() { + return false; + } + +} diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/RelationalDirective.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/RelationalDirective.java new file mode 100644 index 000000000..b66ae2c78 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/RelationalDirective.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.api; + +import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; + +/** + * Directive interface which supports Relational transformations + */ +public interface RelationalDirective extends Directive, LinearRelationalTransform { + + @Override + default Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + // no-op + return relation; + } +} diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java b/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java index a0b6db07e..8dfa91f4c 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -28,11 +32,11 @@ import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.annotations.Categories; import io.cdap.wrangler.api.lineage.Lineage; -import io.cdap.wrangler.api.lineage.Many; import io.cdap.wrangler.api.lineage.Mutation; import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; @@ -110,4 +114,20 @@ public Mutation lineage() { .conditional(source.value(), destination.value()) .build(); } + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(destination.value(), expressionFactory.get().compile(source.value())); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java index 2654f8730..d49d88625 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,6 +36,7 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.Arrays; @@ -101,4 +106,18 @@ public Mutation lineage() { .relation(Many.columns(columns), targetColumn) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + String getColumnString = String.join(",", columns); + return relation.setColumn(targetColumn, expressionFactory.get().compile(String + .format("struct(%s)", getColumnString))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java b/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java index 2114ec2cf..3d415ca8b 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java @@ -19,11 +19,14 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; import io.cdap.wrangler.api.DirectiveParseException; import io.cdap.wrangler.api.ExecutorContext; +import io.cdap.wrangler.api.RelationalDirective; import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.annotations.Categories; import io.cdap.wrangler.api.lineage.Lineage; @@ -88,4 +91,18 @@ public Mutation lineage() { .drop(Many.of(columns)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + for (String col: columns) { + relation = relation.dropColumn(col); + } + return relation; + } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Keep.java b/wrangler-core/src/main/java/io/cdap/directives/column/Keep.java index fbeb067cd..72cc1d1c4 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Keep.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Keep.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,9 +37,12 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -93,4 +101,22 @@ public Mutation lineage() { keep.forEach(column -> builder.relation(column, column)); return builder.build(); } + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + Map keepCol = SqlExpressionGenerator + .generateColumnExpMap(keep, expressionFactory.get()); + return relation.select(keepCol); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Merge.java b/wrangler-core/src/main/java/io/cdap/directives/column/Merge.java index 0bcd212e3..24e9883ec 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Merge.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Merge.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,10 +37,12 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.apache.commons.lang3.StringEscapeUtils; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * A directive for merging two columns and creates a third column. @@ -108,4 +114,20 @@ public Mutation lineage() { .relation(Many.columns(col1, col2), Many.of(col1, col2, dest)) .build(); } + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(dest, expressionFactory.get() + .compile(String.format("CONCAT(%s,'%s',%s)", col1, delimiter, col2))); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Rename.java b/wrangler-core/src/main/java/io/cdap/directives/column/Rename.java index 06bd831f7..dbb5a23d7 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Rename.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Rename.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,8 +35,10 @@ import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; import io.cdap.wrangler.utils.ColumnConverter; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive for renaming columns. @@ -82,4 +88,21 @@ public Mutation lineage() { .relation(source, target) .build(); } + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + relation = relation.setColumn(target.value(), expressionFactory.get().compile(source.value())); + return relation.dropColumn(source.value()); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java index b35da3283..62a90119c 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,8 +36,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive for swapping the column names. @@ -93,4 +99,19 @@ public Mutation lineage() { .relation(Many.of(left, right), Many.of(right, left)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + Relation tempRel = relation.setColumn("tempColumn", expressionFactory.get().compile(right)); + tempRel = tempRel.setColumn(right, expressionFactory.get().compile(left)); + return tempRel.setColumn(left, expressionFactory.get().compile("tempColumn")); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java index 525f7ba7a..65a8255c8 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java +++ b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -37,6 +41,7 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; @@ -148,4 +153,33 @@ public Mutation lineage() { .all(Many.of(col), Many.of(col)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + int currentpos = 1; + int columncounter = 1; + + for (int width : widths) { + String fixedLengthParseExpression = String.format("replace(substr(%s, %d, %d), '%s', \"\")", + col, currentpos, width, padding); + String filterExcessLengthExpression = String.format("%d <= (length(%s) - %d + 1)", width, col, currentpos); + + relation = relation.setColumn(String.format("%s_%d", col, columncounter), + expressionFactory.get().compile(fixedLengthParseExpression)) + .filter(expressionFactory.get().compile(filterExcessLengthExpression)); + + currentpos += width; + columncounter++; + } + + return relation; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java index 5e7a6d7de..916575a1a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,10 +37,12 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.json.JSONObject; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.regex.Pattern; /** @@ -147,5 +153,16 @@ private boolean matchPattern(String value) { } return matches; } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.filter(expressionFactory.get().compile("rlike(" + column + ", '" + pattern + "')")); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java b/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java index 5b4f57f58..cbdf290b2 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -35,9 +40,13 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * A directive for parsing a string into record using the record delimiter. @@ -112,4 +121,18 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + Map columnExpMap = new LinkedHashMap<>(); + columnExpMap.put(column, expressionFactory.get().compile( + String.format("explode(split(%s, \"%s\", %d))", column, delimiter, limit))); + return relation.select(columnExpMap); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java b/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java index 8848ed2a9..75645dc4a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,9 +37,11 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * A directive for splitting the string into multiple {@link Row}s. @@ -109,5 +115,19 @@ public Mutation lineage() { .relation(Many.columns(column), Many.columns(column)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("explode(split(%s, '%s'))", column, regex))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java index 45542b254..5532620ba 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,9 +36,11 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.json.JSONObject; import java.util.List; +import java.util.Optional; /** * A directive to fill null or empty column values with a fixed value. @@ -104,4 +110,17 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile(String + .format("nvl2(%s, if(length(%s) == 0, \"%s\", %s), \"%s\")", + column, column, value, column, value))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java index 46656f5a5..808b6f300 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,8 +35,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.UUID; @@ -87,4 +93,18 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("uuid()"))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/LeftTrim.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/LeftTrim.java index f739d003e..2e7dee5e1 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/LeftTrim.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/LeftTrim.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,8 +35,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive for trimming whitespace from left side of a string @@ -87,4 +93,20 @@ public Mutation lineage() { .relation(col, col) .build(); } + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(col, expressionFactory.get().compile("LTRIM(" + col + ")")); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Lower.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Lower.java index e96af4c27..dd8587044 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Lower.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Lower.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,8 +35,11 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; + /** * A directive for lower casing the 'column' value of type String. @@ -87,4 +94,15 @@ public Mutation lineage() { .relation(column, column) .build(); } + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile("LOWER(" + column + ")")); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/RightTrim.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/RightTrim.java index 3a000a631..2635b4e04 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/RightTrim.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/RightTrim.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,8 +35,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** @@ -88,4 +94,20 @@ public Mutation lineage() { .relation(column, column) .build(); } + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile("RTRIM(" + column + ")")); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java index 60a359f05..a0438dabf 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,8 +38,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive to split email address into account and domain. @@ -128,4 +135,24 @@ private Pair extractDomainAndAccount(String emailId) { return new Pair<>(emailId.substring(0, lastidx), emailId.substring(lastidx + 1)); } } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + String accountExpression = String + .format("substring(%s, 1, char_length(%s) - locate('@', reverse(%s)))", column, column, column); + String domainExpression = String.format("substring_index(%s, '@', -1)", column); + Relation accountRelation = relation + .setColumn(generatedAccountCol, expressionFactory.get().compile(accountExpression)); + return accountRelation.setColumn(generatedDomainCol, + expressionFactory.get().compile(domainExpression)); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/TitleCase.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/TitleCase.java index 0be5bf378..0d0850c4a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/TitleCase.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/TitleCase.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,9 +35,11 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.apache.commons.lang.WordUtils; import java.util.List; +import java.util.Optional; /** * A directive for title casing the 'column' value of type String. @@ -88,4 +94,20 @@ public Mutation lineage() { .relation(column, column) .build(); } + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile("initcap(" + column + ")")); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Trim.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Trim.java index 9f1e4aed1..3c20bfcd2 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Trim.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Trim.java @@ -20,6 +20,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,8 +36,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive for trimming whitespace from both sides of a string @@ -88,4 +94,19 @@ public Mutation lineage() { .relation(column, column) .build(); } + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile("TRIM(" + column + ")")); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Upper.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Upper.java index 0eb09dc94..d664f265f 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Upper.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Upper.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,8 +35,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A Wrangler step for upper casing the 'column' value of type String. @@ -87,4 +93,20 @@ public Mutation lineage() { .relation(column, column) .build(); } + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile("UPPER(" + column + ")")); + } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java index 0ae4df352..98ca135cd 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,10 +35,12 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.List; +import java.util.Optional; /** * A Executor to decodes a column with url encoding. @@ -101,4 +107,17 @@ public Mutation lineage() { .relation(column, column) .build(); } + + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn( + column, expressionFactory.get().compile( + String.format("reflect('java.net.URLDecoder', 'decode', %s, 'utf-8')", column))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java index 204e380c7..b693672f4 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,10 +35,12 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.List; +import java.util.Optional; /** * A Executor to encode a column with url encoding. @@ -101,4 +107,16 @@ public List execute(List rows, ExecutorContext context) throws Directi } return rows; } + + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn( + column, expressionFactory.get().compile( + String.format("reflect('java.net.URLEncoder', 'encode', %s, 'utf-8')", column))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java index b25cd8a65..5a7ef7139 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java @@ -23,6 +23,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -38,6 +42,7 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; @@ -115,4 +120,18 @@ public Mutation lineage() { columns.forEach(column -> builder.relation(column, column)); return builder.build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + String getColumnString = String.join(",", columns); + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("struct(%s)", getColumnString))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/registry/SystemDirectiveRegistry.java b/wrangler-core/src/main/java/io/cdap/wrangler/registry/SystemDirectiveRegistry.java index d886e0017..c7d40f5b2 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/registry/SystemDirectiveRegistry.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/registry/SystemDirectiveRegistry.java @@ -20,6 +20,7 @@ import io.cdap.cdap.api.artifact.ArtifactSummary; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveLoadException; +import io.cdap.wrangler.api.RelationalDirective; import org.reflections.Reflections; import java.util.ArrayList; @@ -84,6 +85,7 @@ public SystemDirectiveRegistry(List namespaces) throws DirectiveLoadExce try { Reflections reflections = new Reflections(namespace); Set> system = reflections.getSubTypesOf(Directive.class); + system.addAll(reflections.getSubTypesOf(RelationalDirective.class)); for (Class directive : system) { DirectiveInfo info = DirectiveInfo.fromSystem(directive); registry.put(info.name(), info); diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java new file mode 100644 index 000000000..b249663bc --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java @@ -0,0 +1,108 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.utils; + +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; +import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; + +import javax.annotation.Nullable; + +/** + * Utility class that contains methods for sql expression generation. + */ +public class SqlExpressionGenerator { + + public static Optional> getExpressionFactory(RelationalTranformContext ctx) { + return ctx.getEngine().getExpressionFactory(StringExpressionFactoryType.SQL); + } + + public static Map generateColumnExpMap(Collection columns, ExpressionFactory factory) { + Map columnExpMap = new LinkedHashMap<>(); + columns.forEach((colName)-> columnExpMap.put((String) colName, factory.compile(colName.toString()))); + return columnExpMap; + } + + public static String getColumnTypeExp(String toType, String column, @Nullable Integer scale) { + toType = toType.toUpperCase(); + String expression; + switch (toType) { + case "INTEGER": + case "I64": + case "INT": { + expression = "CAST(" + column + " AS INT)"; + return expression; + } + + case "I32": + case "SHORT": { + expression = "CAST(" + column + " AS SMALLINT)"; + return expression; + } + + case "LONG": { + expression = "CAST(" + column + " AS BIGINT)"; + return expression; + } + + case "BOOL": + case "BOOLEAN": { + expression = "CAST(" + column + " AS BOOLEAN)"; + return expression; + } + + case "STRING": { + expression = "CAST(" + column + " AS STRING)"; + return expression; + } + + case "FLOAT": { + expression = "CAST(" + column + " AS FLOAT)"; + return expression; + } + + case "DOUBLE": { + expression = "CAST(" + column + " AS DOUBLE)"; + return expression; + } + + case "DECIMAL": { + if (scale != null) { + expression = String.format("CAST(%s AS DECIMAL(38,%d))", column, scale); + return expression; + } else { + expression = String.format("CAST(%s AS DECIMAL)", column); + } + return expression; + } + + case "BYTES": { + expression = "CAST(" + column + " AS TINYINT)"; + return expression; + } + + default: + return column; + } + } +} diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java index d5e57ae69..7d0bec24d 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java @@ -38,21 +38,22 @@ import io.cdap.cdap.etl.api.relational.Expression; import io.cdap.cdap.etl.api.relational.ExpressionFactory; import io.cdap.cdap.etl.api.relational.InvalidRelation; -import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; import io.cdap.cdap.etl.api.relational.Relation; import io.cdap.cdap.etl.api.relational.RelationalTranformContext; -import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType; import io.cdap.cdap.features.Feature; import io.cdap.directives.aggregates.DefaultTransientStore; import io.cdap.wrangler.api.CompileException; import io.cdap.wrangler.api.CompileStatus; import io.cdap.wrangler.api.Compiler; import io.cdap.wrangler.api.Directive; +import io.cdap.wrangler.api.DirectiveExecutionException; import io.cdap.wrangler.api.DirectiveLoadException; import io.cdap.wrangler.api.DirectiveParseException; +import io.cdap.wrangler.api.DirectiveRelationalTransform; import io.cdap.wrangler.api.EntityCountMetric; import io.cdap.wrangler.api.ErrorRecord; import io.cdap.wrangler.api.ExecutorContext; +import io.cdap.wrangler.api.RecipeException; import io.cdap.wrangler.api.RecipeParser; import io.cdap.wrangler.api.RecipePipeline; import io.cdap.wrangler.api.RecipeSymbol; @@ -72,7 +73,9 @@ import io.cdap.wrangler.registry.DirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; import io.cdap.wrangler.registry.UserDirectiveRegistry; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import io.cdap.wrangler.utils.StructuredToRowTransformer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +106,7 @@ @Plugin(type = "transform") @Name("Wrangler") @Description("Wrangler - A interactive tool for data cleansing and transformation.") -public class Wrangler extends Transform implements LinearRelationalTransform { +public class Wrangler extends Transform implements DirectiveRelationalTransform { private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class); // Configuration specifying the dataprep application and service name. @@ -121,8 +124,14 @@ public class Wrangler extends Transform impl public static final String DIRECTIVE_ENTITY_TYPE = "directive"; // Precondition languages - private static final String PRECONDITION_LANGUAGE_JEXL = "jexl"; - private static final String PRECONDITION_LANGUAGE_SQL = "sql"; + private static final String JEXL = "jexl"; + private static final String SQL = "sql"; + + // Sql execution value + private static final String SQL_ENABLED = "yes"; + + // wrangler sql execution mode enabled or not + public boolean isSqlenabled = false; // Plugin configuration. private final Config config; @@ -186,11 +195,11 @@ public void configurePipeline(PipelineConfigurer configurer) { } if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { - if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) { + if (checkSQLExecution(config)) { if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) { validatePrecondition(config.getPreconditionSQL(), true, collector); } - validateSQLModeDirectives(collector); + validateSQLUDDs(collector); } else { if (!config.containsMacro(Config.NAME_PRECONDITION)) { validatePrecondition(config.getPreconditionJEXL(), false, collector); @@ -242,6 +251,12 @@ public void configurePipeline(PipelineConfigurer configurer) { } } } + + // check if the directive is supported by SQL + if (checkSQLExecution(config)) { + validateSQLModeDirectives(collector, getDirectivesList(config)); + } + } catch (CompileException e) { collector.addFailure("Compilation error occurred : " + e.getMessage(), null); } catch (DirectiveParseException e) { @@ -260,8 +275,7 @@ public void configurePipeline(PipelineConfigurer configurer) { // Check if jexl pre-condition is not null or empty and if so compile expression. if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { - if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(false)) { + if (!checkSQLExecution(config) && checkPreconditionNotEmpty(false)) { try { new Precondition(config.getPreconditionJEXL()); } catch (PreconditionException e) { @@ -350,11 +364,12 @@ public void initialize(TransformContext context) throws Exception { context.getStageName()), e ); } - + // initialize the wrangler sql mode + isSqlenabled = checkSQLExecution(config); + // Check if jexl pre-condition is not null or empty and if so compile expression. if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { - if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(false)) { + if (!isSqlenabled && checkPreconditionNotEmpty(false)) { try { condition = new Precondition(config.getPreconditionJEXL()); } catch (PreconditionException e) { @@ -411,8 +426,7 @@ public void transform(StructuredRecord input, Emitter emitter) } // If pre-condition is set, then evaluate the precondition - if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(false)) { + if (!isSqlenabled && checkPreconditionNotEmpty(false)) { boolean skip = condition.apply(row); if (skip) { getContext().getMetrics().count("precondition.filtered", 1); @@ -520,12 +534,17 @@ private void validatePrecondition(String precondition, Boolean isConditionSQL, F } } - private void validateSQLModeDirectives(FailureCollector collector) { - if (!Strings.isNullOrEmpty(config.getDirectives())) { - collector.addFailure("Directives are not supported for precondition of type SQL", null) - .withConfigProperty(Config.NAME_DIRECTIVES); + private void validateSQLModeDirectives(FailureCollector collector, List directives) { + for (Directive directive : directives) { + if (!directive.isSQLSupported()) { + collector.addFailure(String.format("%s directive is not supported by SQL execution.", + directive.define().getDirectiveName()), null) + .withConfigProperty(Config.NAME_DIRECTIVES); + } } + } + private void validateSQLUDDs (FailureCollector collector) { if (!Strings.isNullOrEmpty(config.getUDDs())) { collector.addFailure("UDDs are not supported for precondition of type SQL", null) .withConfigProperty(Config.NAME_UDD); @@ -544,6 +563,33 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) { return false; } + private boolean checkSQLExecution(Config config) { + if (!(Feature.WRANGLER_PRECONDITION_SQL.isEnabled(getContext()) + || Feature.WRANGLER_EXECUTION_SQL.isEnabled(getContext()))) { + // disabling SQL execution for precondition and directives + return false; + } + + if (!Strings.isNullOrEmpty(config.getPreconditionLanguage())) { + return SQL.equalsIgnoreCase(config.getPreconditionLanguage()); + } + + if (!Strings.isNullOrEmpty(config.getSqlExecution())) { + return SQL_ENABLED.equalsIgnoreCase(config.getSqlExecution()); + } + + // for backwards compatibility + return false; + } + + List getDirectivesList(Config config) throws DirectiveParseException, RecipeException { + String recipe = config.getDirectives(); + GrammarBasedParser parser = new GrammarBasedParser("default", + new MigrateToV2(recipe).migrate(), registry); + List directives = parser.parse(); + return directives; + } + /** * This method creates a {@link CompositeDirectiveRegistry} and initializes the {@link RecipeParser} * with {@link NoOpDirectiveContext} @@ -569,27 +615,46 @@ private RecipeParser getRecipeParser(StageContext context) @Override public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) { - if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(true)) { + if (!checkSQLExecution(config)) { + return new InvalidRelation("Plugin is not configured for relational transformation"); + } - if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) { - throw new RuntimeException("SQL Precondition feature is not available"); - } + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } - Optional> expressionFactory = getExpressionFactory(relationalTranformContext); - if (!expressionFactory.isPresent()) { - return new InvalidRelation("Cannot find an Expression Factory"); - } + Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL()); + Relation filteredRelation = relation.filter(filterExpression); - Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL()); - return relation.filter(filterExpression); + registry = SystemDirectiveRegistry.INSTANCE; + try { + registry.reload("default"); + } catch (DirectiveLoadException e) { + throw new RuntimeException(e); } - return new InvalidRelation("Plugin is not configured for relational transformation"); + List directives = null; + try { + directives = getDirectivesList(config); + } catch (DirectiveParseException | RecipeException e) { + throw new RuntimeException(e); + } + + for (Directive directive : directives) { + filteredRelation = directive + .transform(relationalTranformContext, filteredRelation); + } + return filteredRelation; } - private Optional> getExpressionFactory(RelationalTranformContext ctx) { - return ctx.getEngine().getExpressionFactory(StringExpressionFactoryType.SQL); + private List getColumnsOfDropSQL(String sql) { + List cols = new ArrayList<>(); + for (String col : sql.split(" ")[2].split(",")) { + cols.add(col.trim()); + } + return cols; } /** @@ -640,8 +705,10 @@ public static class Config extends PluginConfig { static final String NAME_PRECONDITION = "precondition"; static final String NAME_PRECONDITION_SQL = "preconditionSQL"; static final String NAME_PRECONDITION_LANGUAGE = "expressionLanguage"; + static final String NAME_SQL_EXECUTION = "sqlExecution"; static final String NAME_FIELD = "field"; static final String NAME_DIRECTIVES = "directives"; + static final String NAME_RELATIONAL_DIRECTIVES = "relationalDirectives"; static final String NAME_UDD = "udd"; static final String NAME_SCHEMA = "schema"; static final String NAME_ON_ERROR = "on-error"; @@ -652,6 +719,12 @@ public static class Config extends PluginConfig { @Nullable private String preconditionLanguage; + @Name(NAME_SQL_EXECUTION) + @Description("Toggle to configure execution language between JEXL and SQL") + @Macro + @Nullable + private String sqlExecution; + @Name(NAME_PRECONDITION) @Description("JEXL Precondition expression specifying filtering before applying directives (true to filter)") @Macro @@ -672,6 +745,7 @@ public static class Config extends PluginConfig { @Nullable private String directives; + @Name(NAME_UDD) @Description("List of User Defined Directives (UDD) that have to be loaded.") @Nullable @@ -693,9 +767,10 @@ public static class Config extends PluginConfig { @Nullable private final String onError; - public Config(String preconditionLanguage, String precondition, String directives, String udds, - String field, String schema, String onError) { + public Config(String preconditionLanguage, String sqlExecution, String precondition, String directives, + String udds, String field, String schema, String onError, String relationalDirectives) { this.preconditionLanguage = preconditionLanguage; + this.sqlExecution = sqlExecution; this.precondition = precondition; this.directives = directives; this.udds = udds; @@ -713,12 +788,11 @@ public String getOnError() { } public String getPreconditionLanguage() { - if (Strings.isNullOrEmpty(preconditionLanguage)) { - // due to backward compatibility... - return PRECONDITION_LANGUAGE_JEXL; - } return preconditionLanguage; } + public String getSqlExecution() { + return sqlExecution; + } public String getPreconditionJEXL() { return precondition; @@ -741,4 +815,3 @@ public String getUDDs() { } } } - diff --git a/wrangler-transform/widgets/Wrangler-transform.json b/wrangler-transform/widgets/Wrangler-transform.json index 9f9c7611e..6a7738215 100644 --- a/wrangler-transform/widgets/Wrangler-transform.json +++ b/wrangler-transform/widgets/Wrangler-transform.json @@ -37,6 +37,25 @@ ] } }, + { + "widget-type": "radio-group", + "name": "sqlExecution", + "label": "Enable SQL Execution", + "widget-attributes": { + "layout": "inline", + "default": "no", + "options": [ + { + "id": "yes", + "label": "Yes" + }, + { + "id": "no", + "label": "No" + } + ] + } + }, { "widget-type": "textbox", "label": "Precondition (JEXL)", @@ -50,7 +69,7 @@ "label": "Precondition (SQL)", "name": "preconditionSQL", "widget-attributes" : { - "default" : "false" + "default" : "true" } } ] @@ -73,6 +92,24 @@ } ] }, + { + "label": "RelationalDirectives", + "properties": [ + { + "widget-type": "wrangler-relational-directives", + "label": "RelationalRecipe", + "name": "RelationalDirectives", + "widget-attributes" : { + "placeholder" : "#pragma load-directives my-directive; my-directive :body;" + } + }, + { + "widget-type": "csv", + "label": "User Defined SQL(UDS)", + "name": "uds" + } + ] + }, { "label" : "Error Handling", "properties" : [ @@ -108,7 +145,7 @@ { "name": "PreconditionValueNotSQL", "condition": { - "expression": "expressionLanguage != 'sql'" + "expression": "expressionLanguage == 'jexl' && sqlExecution == 'no'" }, "show": [ { @@ -120,7 +157,7 @@ { "name": "preconditionValueSQL", "condition": { - "expression": "expressionLanguage == 'sql'" + "expression": "expressionLanguage == 'sql' || sqlExecution == 'yes'" }, "show": [ { @@ -132,7 +169,7 @@ { "name": "preconditionSQLEnabled", "condition": { - "expression": "featureFlags['wrangler.precondition.sql.enabled'] == true" + "expression": "featureFlags['wrangler.precondition.sql.enabled'] == true && featureFlags['wrangler.execution.sql.enabled'] == false" }, "show": [ { @@ -140,6 +177,18 @@ "name": "expressionLanguage" } ] + }, + { + "name": "executionSQLEnabled", + "condition": { + "expression": "featureFlags['wrangler.execution.sql.enabled'] == true" + }, + "show": [ + { + "type": "properties", + "name": "sqlExecution" + } + ] } ], "outputs": [