-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Setup for Wrangler SQL Execution Support #690
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ | |
|
||
package io.cdap.wrangler.api; | ||
|
||
import io.cdap.cdap.etl.api.relational.Relation; | ||
import io.cdap.cdap.etl.api.relational.RelationalTranformContext; | ||
import io.cdap.wrangler.api.parser.UsageDefinition; | ||
|
||
import java.util.List; | ||
|
@@ -51,7 +53,8 @@ | |
* } | ||
* </code> | ||
*/ | ||
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics { | ||
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics, | ||
DirectiveRelationalTransform { | ||
/** | ||
* This defines a interface variable that is static and final for specify | ||
* the {@code type} of the plugin this interface would provide. | ||
|
@@ -126,4 +129,11 @@ default List<EntityCountMetric> getCountMetrics() { | |
// no op | ||
return null; | ||
} | ||
|
||
@Override | ||
default Relation transform(RelationalTranformContext relationalTranformContext, | ||
Relation relation) { | ||
// no-op | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking just at this class, it seems like every existing directive would become a no-op in SQL, which would be incorrect. It only makes sense when seeing the isSQLSupported() method in the RelationalTransform interface, and that it defaults to false. I think it would be cleaner if the Directive interface did not extend the RelationalTransform interface, with only directives that do support SQL (like Drop) implementing it. Then the isSQLSupported() method is not needed, Wrangler just checks if the directive is an instance of RelationTransform. This way it is impossible to implement a directive where the transform() and isSQLSupported() methods don't match up. |
||
return relation; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Copyright © 2023 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.wrangler.api; | ||
|
||
import io.cdap.cdap.etl.api.relational.InvalidRelation; | ||
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; | ||
import io.cdap.cdap.etl.api.relational.Relation; | ||
import io.cdap.cdap.etl.api.relational.RelationalTranformContext; | ||
|
||
/** | ||
* {@link DirectiveRelationalTransform} provides relational transform support for | ||
* wrangler directives. | ||
*/ | ||
public interface DirectiveRelationalTransform extends LinearRelationalTransform { | ||
|
||
/** | ||
* Implementation of linear relational transform for each supported directive. | ||
* | ||
* @param relationalTranformContext transformation context with engine, input and output parameters | ||
* @param relation input relation upon which the transformation is applied. | ||
* @return transformed relation as the output relation. By default, returns an Invalid relation | ||
* for unsupported directives. | ||
*/ | ||
default Relation transform(RelationalTranformContext relationalTranformContext, | ||
Relation relation) { | ||
return new InvalidRelation("SQL execution for the directive is currently not supported."); | ||
} | ||
|
||
/** | ||
* Indicates whether the directive is supported by relational transformation or not. | ||
* | ||
* @return boolean value for the directive SQL support. | ||
* By default, returns false, indicating that the directive is currently not supported. | ||
*/ | ||
default boolean isSQLSupported() { | ||
return false; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,6 @@ | |
import io.cdap.cdap.etl.api.relational.Expression; | ||
import io.cdap.cdap.etl.api.relational.ExpressionFactory; | ||
import io.cdap.cdap.etl.api.relational.InvalidRelation; | ||
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; | ||
import io.cdap.cdap.etl.api.relational.Relation; | ||
import io.cdap.cdap.etl.api.relational.RelationalTranformContext; | ||
import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType; | ||
|
@@ -50,9 +49,11 @@ | |
import io.cdap.wrangler.api.Directive; | ||
import io.cdap.wrangler.api.DirectiveLoadException; | ||
import io.cdap.wrangler.api.DirectiveParseException; | ||
import io.cdap.wrangler.api.DirectiveRelationalTransform; | ||
import io.cdap.wrangler.api.EntityCountMetric; | ||
import io.cdap.wrangler.api.ErrorRecord; | ||
import io.cdap.wrangler.api.ExecutorContext; | ||
import io.cdap.wrangler.api.RecipeException; | ||
import io.cdap.wrangler.api.RecipeParser; | ||
import io.cdap.wrangler.api.RecipePipeline; | ||
import io.cdap.wrangler.api.RecipeSymbol; | ||
|
@@ -103,7 +104,8 @@ | |
@Plugin(type = "transform") | ||
@Name("Wrangler") | ||
@Description("Wrangler - A interactive tool for data cleansing and transformation.") | ||
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> implements LinearRelationalTransform { | ||
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> implements DirectiveRelationalTransform { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class); | ||
|
||
// Configuration specifying the dataprep application and service name. | ||
|
@@ -124,6 +126,12 @@ public class Wrangler extends Transform<StructuredRecord, StructuredRecord> impl | |
private static final String PRECONDITION_LANGUAGE_JEXL = "jexl"; | ||
private static final String PRECONDITION_LANGUAGE_SQL = "sql"; | ||
|
||
// Sql execution value | ||
private static final String SQL_ENABLED = "yes"; | ||
|
||
// wrangler sql execution mode enabled or not | ||
public boolean isSqlEnabled = false; | ||
|
||
// Plugin configuration. | ||
private final Config config; | ||
|
||
|
@@ -172,7 +180,7 @@ public void configurePipeline(PipelineConfigurer configurer) { | |
try { | ||
Schema iSchema = configurer.getStageConfigurer().getInputSchema(); | ||
if (!config.containsMacro(Config.NAME_FIELD) && !(config.getField().equals("*") | ||
|| config.getField().equals("#"))) { | ||
|| config.getField().equals("#"))) { | ||
validateInputSchema(iSchema, collector); | ||
} | ||
|
||
|
@@ -185,12 +193,22 @@ public void configurePipeline(PipelineConfigurer configurer) { | |
} | ||
} | ||
|
||
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { | ||
if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) { | ||
harshdeeppruthi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
isSqlEnabled = checkSQLExecutionEnabled(config); | ||
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE) | ||
|| !config.containsMacro(Config.NAME_SQL_EXECUTION)) { | ||
if (!config.containsMacro(Config.NAME_SQL_EXECUTION)) { | ||
if (isSqlEnabled && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())) { | ||
collector.addFailure("JEXL Precondition is not supported when SQL execution is enabled.", null); | ||
} | ||
if (!isSqlEnabled && PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) { | ||
collector.addFailure("SQL Precondition is only supported when SQL execution is enabled.", null); | ||
} | ||
} | ||
if (isSqlEnabled || PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) { | ||
if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) { | ||
validatePrecondition(config.getPreconditionSQL(), true, collector); | ||
} | ||
validateSQLModeDirectives(collector); | ||
validateSQLUDDs(collector); | ||
} else { | ||
if (!config.containsMacro(Config.NAME_PRECONDITION)) { | ||
validatePrecondition(config.getPreconditionJEXL(), false, collector); | ||
|
@@ -248,6 +266,12 @@ public void configurePipeline(PipelineConfigurer configurer) { | |
collector.addFailure(e.getMessage(), null); | ||
} | ||
|
||
// If SQL Execution is enabled, check that the directives supports sql execution | ||
if (isSqlEnabled) { | ||
List<Directive> sqlDirectives = getDirectivesList(config); | ||
validateSQLModeDirectives(collector, sqlDirectives); | ||
} | ||
|
||
// Based on the configuration create output schema. | ||
try { | ||
if (!config.containsMacro(Config.NAME_SCHEMA)) { | ||
|
@@ -259,7 +283,7 @@ public void configurePipeline(PipelineConfigurer configurer) { | |
} | ||
|
||
// Check if jexl pre-condition is not null or empty and if so compile expression. | ||
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { | ||
if (!isSqlEnabled && !config.containsMacro(Config.NAME_PRECONDITION)) { | ||
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) | ||
&& checkPreconditionNotEmpty(false)) { | ||
try { | ||
|
@@ -352,7 +376,7 @@ public void initialize(TransformContext context) throws Exception { | |
} | ||
|
||
// Check if jexl pre-condition is not null or empty and if so compile expression. | ||
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { | ||
if (!isSqlEnabled && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { | ||
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) | ||
&& checkPreconditionNotEmpty(false)) { | ||
try { | ||
|
@@ -411,7 +435,7 @@ public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) | |
} | ||
|
||
// If pre-condition is set, then evaluate the precondition | ||
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) | ||
if (!isSqlEnabled && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) | ||
&& checkPreconditionNotEmpty(false)) { | ||
boolean skip = condition.apply(row); | ||
if (skip) { | ||
|
@@ -520,12 +544,17 @@ private void validatePrecondition(String precondition, Boolean isConditionSQL, F | |
} | ||
} | ||
|
||
private void validateSQLModeDirectives(FailureCollector collector) { | ||
if (!Strings.isNullOrEmpty(config.getDirectives())) { | ||
collector.addFailure("Directives are not supported for precondition of type SQL", null) | ||
.withConfigProperty(Config.NAME_DIRECTIVES); | ||
private void validateSQLModeDirectives(FailureCollector collector, List<Directive> directives) { | ||
for (Directive directive : directives) { | ||
if (!directive.isSQLSupported()) { | ||
collector.addFailure(String.format("%s directive is not supported by SQL execution.", | ||
directive.define().getDirectiveName()), null) | ||
.withConfigProperty(Config.NAME_DIRECTIVES); | ||
} | ||
} | ||
} | ||
|
||
private void validateSQLUDDs(FailureCollector collector) { | ||
if (!Strings.isNullOrEmpty(config.getUDDs())) { | ||
collector.addFailure("UDDs are not supported for precondition of type SQL", null) | ||
.withConfigProperty(Config.NAME_UDD); | ||
|
@@ -544,6 +573,23 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) { | |
return false; | ||
} | ||
|
||
private boolean checkSQLExecutionEnabled(Config config) { | ||
if (!Feature.WRANGLER_EXECUTION_SQL.isEnabled(getContext())) { | ||
return false; | ||
} | ||
|
||
return SQL_ENABLED.equalsIgnoreCase(config.getSqlExecution()); | ||
} | ||
|
||
List<Directive> getDirectivesList(Config config) throws DirectiveParseException, RecipeException { | ||
String recipe = config.getDirectives(); | ||
List<Directive> directives = null; | ||
GrammarBasedParser parser = new GrammarBasedParser("default", | ||
new MigrateToV2(recipe).migrate(), registry); | ||
directives = parser.parse(); | ||
return directives; | ||
} | ||
|
||
/** | ||
* This method creates a {@link CompositeDirectiveRegistry} and initializes the {@link RecipeParser} | ||
* with {@link NoOpDirectiveContext} | ||
|
@@ -569,23 +615,46 @@ private RecipeParser getRecipeParser(StageContext context) | |
|
||
@Override | ||
public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) { | ||
if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage()) | ||
&& checkPreconditionNotEmpty(true)) { | ||
isSqlEnabled = checkSQLExecutionEnabled(config); | ||
if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext) | ||
&& !Feature.WRANGLER_EXECUTION_SQL.isEnabled(relationalTranformContext)) { | ||
return new InvalidRelation("Plugin is not configured for relational transformation"); | ||
} | ||
|
||
if (Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) { | ||
if ((isSqlEnabled || PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) | ||
&& checkPreconditionNotEmpty(true)) { | ||
Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext); | ||
if (!expressionFactory.isPresent()) { | ||
return new InvalidRelation("Cannot find an Expression Factory"); | ||
} | ||
Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL()); | ||
relation = relation.filter(filterExpression); | ||
} | ||
} | ||
|
||
if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) { | ||
throw new RuntimeException("SQL Precondition feature is not available"); | ||
if (isSqlEnabled) { | ||
registry = SystemDirectiveRegistry.INSTANCE; | ||
try { | ||
registry.reload("default"); | ||
} catch (DirectiveLoadException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext); | ||
if (!expressionFactory.isPresent()) { | ||
return new InvalidRelation("Cannot find an Expression Factory"); | ||
List<Directive> directives = null; | ||
try { | ||
directives = getDirectivesList(config); | ||
} catch (DirectiveParseException | RecipeException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL()); | ||
return relation.filter(filterExpression); | ||
for (Directive directive : directives) { | ||
relation = directive | ||
.transform(relationalTranformContext, relation); | ||
} | ||
} | ||
|
||
return new InvalidRelation("Plugin is not configured for relational transformation"); | ||
return relation; | ||
} | ||
|
||
private Optional<ExpressionFactory<String>> getExpressionFactory(RelationalTranformContext ctx) { | ||
|
@@ -637,6 +706,8 @@ private Map<String, String> getEntityMetricTags(EntityCountMetric metricDef) { | |
* Config for the plugin. | ||
*/ | ||
public static class Config extends PluginConfig { | ||
|
||
static final String NAME_SQL_EXECUTION = "sqlExecution"; | ||
static final String NAME_PRECONDITION = "precondition"; | ||
static final String NAME_PRECONDITION_SQL = "preconditionSQL"; | ||
static final String NAME_PRECONDITION_LANGUAGE = "expressionLanguage"; | ||
|
@@ -646,6 +717,11 @@ public static class Config extends PluginConfig { | |
static final String NAME_SCHEMA = "schema"; | ||
static final String NAME_ON_ERROR = "on-error"; | ||
|
||
@Name(NAME_SQL_EXECUTION) | ||
@Description("Toggle to configure execution language between JEXL and SQL") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JEXL is a technical term that many users won't understand. It's also not technically correct, as many directives are direct java implementations that don't use jexl scripting. Not sure what would be a better term though. Native? Default? |
||
@Macro | ||
@Nullable | ||
private String sqlExecution; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add a newline after this line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to name this something like 'executionMode' and have an internal Enum for it. This is more extensible than a boolean toggle for SQL or no SQL, as it allows us to add new modes in the future if needed. (As a side note, when a property is meant to be a toggle, it should be of Boolean type, not String type). For example, we may eventually want to add an 'auto' mode that lets the engine choose, or maybe some BigQuery specific SQL mode or something like that. |
||
@Name(NAME_PRECONDITION_LANGUAGE) | ||
@Description("Toggle to configure precondition language between JEXL and SQL") | ||
@Macro | ||
|
@@ -693,8 +769,9 @@ public static class Config extends PluginConfig { | |
@Nullable | ||
private final String onError; | ||
|
||
public Config(String preconditionLanguage, String precondition, String directives, String udds, | ||
String field, String schema, String onError) { | ||
public Config(String sqlExecution, String preconditionLanguage, String precondition, String directives, String udds, | ||
String field, String schema, String onError) { | ||
this.sqlExecution = sqlExecution; | ||
this.preconditionLanguage = preconditionLanguage; | ||
this.precondition = precondition; | ||
this.directives = directives; | ||
|
@@ -713,13 +790,13 @@ public String getOnError() { | |
} | ||
|
||
public String getPreconditionLanguage() { | ||
if (Strings.isNullOrEmpty(preconditionLanguage)) { | ||
// due to backward compatibility... | ||
return PRECONDITION_LANGUAGE_JEXL; | ||
} | ||
return preconditionLanguage; | ||
} | ||
|
||
public String getSqlExecution() { | ||
return sqlExecution; | ||
} | ||
|
||
public String getPreconditionJEXL() { | ||
return precondition; | ||
} | ||
|
@@ -741,4 +818,3 @@ public String getUDDs() { | |
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could add a description for this method similar to getCountMetrics()