diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java index 2a199263b..78038afef 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java @@ -16,6 +16,8 @@ package io.cdap.wrangler.api; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.parser.UsageDefinition; import java.util.List; @@ -51,7 +53,8 @@ * } * */ -public interface Directive extends Executor, List>, EntityMetrics { +public interface Directive extends Executor, List>, EntityMetrics, + DirectiveRelationalTransform { /** * This defines a interface variable that is static and final for specify * the {@code type} of the plugin this interface would provide. @@ -126,4 +129,11 @@ default List getCountMetrics() { // no op return null; } + + @Override + default Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + // no-op + return relation; + } } diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java new file mode 100644 index 000000000..943f564f6 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.api; + +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; + +/** + * {@link DirectiveRelationalTransform} provides relational transform support for + * wrangler directives. + */ +public interface DirectiveRelationalTransform extends LinearRelationalTransform { + + /** + * Implementation of linear relational transform for each supported directive. + * + * @param relationalTranformContext transformation context with engine, input and output parameters + * @param relation input relation upon which the transformation is applied. + * @return transformed relation as the output relation. By default, returns an Invalid relation + * for unsupported directives. + */ + default Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + return new InvalidRelation("SQL execution for the directive is currently not supported."); + } + + /** + * Indicates whether the directive is supported by relational transformation or not. + * + * @return boolean value for the directive SQL support. + * By default, returns false, indicating that the directive is currently not supported. + */ + default boolean isSQLSupported() { + return false; + } + +} diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/RelationalDirective.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/RelationalDirective.java new file mode 100644 index 000000000..b66ae2c78 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/RelationalDirective.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.api; + +import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; + +/** + * Directive interface which supports Relational transformations + */ +public interface RelationalDirective extends Directive, LinearRelationalTransform { + + @Override + default Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + // no-op + return relation; + } +} diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java b/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java index 2114ec2cf..6ff191863 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java @@ -19,6 +19,8 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -88,4 +90,18 @@ public Mutation lineage() { .drop(Many.of(columns)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + for (String col: columns) { + relation = relation.dropColumn(col); + } + return relation; + } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java new file mode 100644 index 000000000..2ff241ab3 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.utils; + +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; +import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType; + +import java.util.Optional; + +/** + * Utility class that contains methods for sql expression generation. + */ +public class SqlExpressionGenerator { + + public static Optional> getExpressionFactory(RelationalTranformContext ctx) { + return ctx.getEngine().getExpressionFactory(StringExpressionFactoryType.SQL); + } +} diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java index d5e57ae69..1fd6d9c1e 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java @@ -48,11 +48,14 @@ import io.cdap.wrangler.api.CompileStatus; import io.cdap.wrangler.api.Compiler; import io.cdap.wrangler.api.Directive; +import io.cdap.wrangler.api.DirectiveExecutionException; import io.cdap.wrangler.api.DirectiveLoadException; import io.cdap.wrangler.api.DirectiveParseException; +import io.cdap.wrangler.api.DirectiveRelationalTransform; import io.cdap.wrangler.api.EntityCountMetric; import io.cdap.wrangler.api.ErrorRecord; import io.cdap.wrangler.api.ExecutorContext; +import io.cdap.wrangler.api.RecipeException; import io.cdap.wrangler.api.RecipeParser; import io.cdap.wrangler.api.RecipePipeline; import io.cdap.wrangler.api.RecipeSymbol; @@ -72,6 +75,7 @@ import io.cdap.wrangler.registry.DirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; import io.cdap.wrangler.registry.UserDirectiveRegistry; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import io.cdap.wrangler.utils.StructuredToRowTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +107,7 @@ @Plugin(type = "transform") @Name("Wrangler") @Description("Wrangler - A interactive tool for data cleansing and transformation.") -public class Wrangler extends Transform implements LinearRelationalTransform { +public class Wrangler extends Transform implements DirectiveRelationalTransform { private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class); // Configuration specifying the dataprep application and service name. @@ -121,8 +125,15 @@ public class Wrangler extends Transform impl public static final String DIRECTIVE_ENTITY_TYPE = "directive"; // Precondition languages - private static final String PRECONDITION_LANGUAGE_JEXL = "jexl"; - private static final String PRECONDITION_LANGUAGE_SQL = "sql"; + private static final String JEXL = "jexl"; + private static final String SQL = "sql"; + + // Sql execution value + private static final String SQL_ENABLED = "yes"; + + // wrangler sql execution mode enabled or not + + public boolean isSqlenabled = false; // Plugin configuration. private final Config config; @@ -186,11 +197,11 @@ public void configurePipeline(PipelineConfigurer configurer) { } if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { - if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) { + if (checkSQLExecution(config)) { if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) { validatePrecondition(config.getPreconditionSQL(), true, collector); } - validateSQLModeDirectives(collector); + validateSQLUDDs(collector); } else { if (!config.containsMacro(Config.NAME_PRECONDITION)) { validatePrecondition(config.getPreconditionJEXL(), false, collector); @@ -242,6 +253,14 @@ public void configurePipeline(PipelineConfigurer configurer) { } } } + + // check if the directive is supported by SQL + if (checkSQLExecution(config)) { + List sqlDirectives = null; + sqlDirectives = getDirectivesList(config); + validateSQLModeDirectives(collector, sqlDirectives); + } + } catch (CompileException e) { collector.addFailure("Compilation error occurred : " + e.getMessage(), null); } catch (DirectiveParseException e) { @@ -260,8 +279,7 @@ public void configurePipeline(PipelineConfigurer configurer) { // Check if jexl pre-condition is not null or empty and if so compile expression. if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { - if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(false)) { + if (!checkSQLExecution(config) && checkPreconditionNotEmpty(false)) { try { new Precondition(config.getPreconditionJEXL()); } catch (PreconditionException e) { @@ -350,11 +368,12 @@ public void initialize(TransformContext context) throws Exception { context.getStageName()), e ); } + // initialize the wrangler sql mode + isSqlenabled = checkSQLExecution(config); // Check if jexl pre-condition is not null or empty and if so compile expression. if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { - if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(false)) { + if (!isSqlenabled && checkPreconditionNotEmpty(false)) { try { condition = new Precondition(config.getPreconditionJEXL()); } catch (PreconditionException e) { @@ -411,8 +430,7 @@ public void transform(StructuredRecord input, Emitter emitter) } // If pre-condition is set, then evaluate the precondition - if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(false)) { + if (!isSqlenabled && checkPreconditionNotEmpty(false)) { boolean skip = condition.apply(row); if (skip) { getContext().getMetrics().count("precondition.filtered", 1); @@ -520,12 +538,17 @@ private void validatePrecondition(String precondition, Boolean isConditionSQL, F } } - private void validateSQLModeDirectives(FailureCollector collector) { - if (!Strings.isNullOrEmpty(config.getDirectives())) { - collector.addFailure("Directives are not supported for precondition of type SQL", null) - .withConfigProperty(Config.NAME_DIRECTIVES); + private void validateSQLModeDirectives(FailureCollector collector, List directives) { + for (Directive directive :directives) { + if (!directive.isSQLSupported()) { + collector.addFailure(String.format("%s directive is not supported by SQL execution.", + directive.define().getDirectiveName()), null) + .withConfigProperty(Config.NAME_DIRECTIVES); + } } + } + private void validateSQLUDDs (FailureCollector collector) { if (!Strings.isNullOrEmpty(config.getUDDs())) { collector.addFailure("UDDs are not supported for precondition of type SQL", null) .withConfigProperty(Config.NAME_UDD); @@ -544,6 +567,34 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) { return false; } + private boolean checkSQLExecution(Config config) { + if (!(Feature.WRANGLER_PRECONDITION_SQL.isEnabled(getContext()) + || Feature.WRANGLER_EXECUTION_SQL.isEnabled(getContext()))) { + // disabling SQL execution for precondition and directives + return false; + } + + if (!Strings.isNullOrEmpty(config.getPreconditionLanguage())) { + return SQL.equalsIgnoreCase(config.getPreconditionLanguage()); + } + + if (!Strings.isNullOrEmpty(config.getSqlExecution())) { + return SQL_ENABLED.equalsIgnoreCase(config.getSqlExecution()); + } + + // for backwards compatibility + return false; + } + + List getDirectivesList(Config config) throws DirectiveParseException, RecipeException { + String recipe = config.getDirectives(); + List directives = null; + GrammarBasedParser parser = new GrammarBasedParser("default", + new MigrateToV2(recipe).migrate(), registry); + directives = parser.parse(); + return directives; + } + /** * This method creates a {@link CompositeDirectiveRegistry} and initializes the {@link RecipeParser} * with {@link NoOpDirectiveContext} @@ -569,27 +620,38 @@ private RecipeParser getRecipeParser(StageContext context) @Override public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) { - if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(true)) { + if (!checkSQLExecution(config)) { + return new InvalidRelation("Plugin is not configured for relational transformation"); + } - if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) { - throw new RuntimeException("SQL Precondition feature is not available"); - } + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } - Optional> expressionFactory = getExpressionFactory(relationalTranformContext); - if (!expressionFactory.isPresent()) { - return new InvalidRelation("Cannot find an Expression Factory"); - } + Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL()); + Relation filteredRelation = relation.filter(filterExpression); - Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL()); - return relation.filter(filterExpression); + registry = SystemDirectiveRegistry.INSTANCE; + try { + registry.reload("default"); + } catch (DirectiveLoadException e) { + throw new RuntimeException(e); } - return new InvalidRelation("Plugin is not configured for relational transformation"); - } + List directives = null; + try { + directives = getDirectivesList(config); + } catch (DirectiveParseException | RecipeException e) { + throw new RuntimeException(e); + } - private Optional> getExpressionFactory(RelationalTranformContext ctx) { - return ctx.getEngine().getExpressionFactory(StringExpressionFactoryType.SQL); + for (Directive directive : directives) { + filteredRelation = directive + .transform(relationalTranformContext, filteredRelation); + } + return filteredRelation; } /** @@ -640,8 +702,10 @@ public static class Config extends PluginConfig { static final String NAME_PRECONDITION = "precondition"; static final String NAME_PRECONDITION_SQL = "preconditionSQL"; static final String NAME_PRECONDITION_LANGUAGE = "expressionLanguage"; + static final String NAME_SQL_EXECUTION = "sqlExecution"; static final String NAME_FIELD = "field"; static final String NAME_DIRECTIVES = "directives"; + static final String NAME_RELATIONAL_DIRECTIVES = "relationalDirectives"; static final String NAME_UDD = "udd"; static final String NAME_SCHEMA = "schema"; static final String NAME_ON_ERROR = "on-error"; @@ -652,6 +716,12 @@ public static class Config extends PluginConfig { @Nullable private String preconditionLanguage; + @Name(NAME_SQL_EXECUTION) + @Description("Toggle to configure execution language between JEXL and SQL") + @Macro + @Nullable + private String sqlExecution; + @Name(NAME_PRECONDITION) @Description("JEXL Precondition expression specifying filtering before applying directives (true to filter)") @Macro @@ -693,9 +763,10 @@ public static class Config extends PluginConfig { @Nullable private final String onError; - public Config(String preconditionLanguage, String precondition, String directives, String udds, - String field, String schema, String onError) { + public Config(String preconditionLanguage, String sqlExecution, String precondition, String directives, + String udds, String field, String schema, String onError, String relationalDirectives) { this.preconditionLanguage = preconditionLanguage; + this.sqlExecution = sqlExecution; this.precondition = precondition; this.directives = directives; this.udds = udds; @@ -713,12 +784,11 @@ public String getOnError() { } public String getPreconditionLanguage() { - if (Strings.isNullOrEmpty(preconditionLanguage)) { - // due to backward compatibility... - return PRECONDITION_LANGUAGE_JEXL; - } return preconditionLanguage; } + public String getSqlExecution() { + return sqlExecution; + } public String getPreconditionJEXL() { return precondition; diff --git a/wrangler-transform/widgets/Wrangler-transform.json b/wrangler-transform/widgets/Wrangler-transform.json index 9f9c7611e..6a7738215 100644 --- a/wrangler-transform/widgets/Wrangler-transform.json +++ b/wrangler-transform/widgets/Wrangler-transform.json @@ -37,6 +37,25 @@ ] } }, + { + "widget-type": "radio-group", + "name": "sqlExecution", + "label": "Enable SQL Execution", + "widget-attributes": { + "layout": "inline", + "default": "no", + "options": [ + { + "id": "yes", + "label": "Yes" + }, + { + "id": "no", + "label": "No" + } + ] + } + }, { "widget-type": "textbox", "label": "Precondition (JEXL)", @@ -50,7 +69,7 @@ "label": "Precondition (SQL)", "name": "preconditionSQL", "widget-attributes" : { - "default" : "false" + "default" : "true" } } ] @@ -73,6 +92,24 @@ } ] }, + { + "label": "RelationalDirectives", + "properties": [ + { + "widget-type": "wrangler-relational-directives", + "label": "RelationalRecipe", + "name": "RelationalDirectives", + "widget-attributes" : { + "placeholder" : "#pragma load-directives my-directive; my-directive :body;" + } + }, + { + "widget-type": "csv", + "label": "User Defined SQL(UDS)", + "name": "uds" + } + ] + }, { "label" : "Error Handling", "properties" : [ @@ -108,7 +145,7 @@ { "name": "PreconditionValueNotSQL", "condition": { - "expression": "expressionLanguage != 'sql'" + "expression": "expressionLanguage == 'jexl' && sqlExecution == 'no'" }, "show": [ { @@ -120,7 +157,7 @@ { "name": "preconditionValueSQL", "condition": { - "expression": "expressionLanguage == 'sql'" + "expression": "expressionLanguage == 'sql' || sqlExecution == 'yes'" }, "show": [ { @@ -132,7 +169,7 @@ { "name": "preconditionSQLEnabled", "condition": { - "expression": "featureFlags['wrangler.precondition.sql.enabled'] == true" + "expression": "featureFlags['wrangler.precondition.sql.enabled'] == true && featureFlags['wrangler.execution.sql.enabled'] == false" }, "show": [ { @@ -140,6 +177,18 @@ "name": "expressionLanguage" } ] + }, + { + "name": "executionSQLEnabled", + "condition": { + "expression": "featureFlags['wrangler.execution.sql.enabled'] == true" + }, + "show": [ + { + "type": "properties", + "name": "sqlExecution" + } + ] } ], "outputs": [