Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup for Wrangler SQL Execution Support #690

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.cdap.wrangler.api;

import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.parser.UsageDefinition;

import java.util.List;
Expand Down Expand Up @@ -51,7 +53,8 @@
* }
* </code>
*/
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics {
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics,
DirectiveRelationalTransform {
/**
* This defines a interface variable that is static and final for specify
* the {@code type} of the plugin this interface would provide.
Expand Down Expand Up @@ -126,4 +129,11 @@ default List<EntityCountMetric> getCountMetrics() {
// no op
return null;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add a description for this method similar to getCountMetrics()

@Override
default Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
// no-op
Copy link
Contributor

@albertshau albertshau Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking just at this class, it seems like every existing directive would become a no-op in SQL, which would be incorrect. It only makes sense when seeing the isSQLSupported() method in the RelationalTransform interface, and that it defaults to false.

I think it would be cleaner if the Directive interface did not extend the RelationalTransform interface, with only directives that do support SQL (like Drop) implementing it. Then the isSQLSupported() method is not needed, Wrangler just checks if the directive is an instance of RelationTransform. This way it is impossible to implement a directive where the transform() and isSQLSupported() methods don't match up.

return relation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.api;

import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;

/**
* {@link DirectiveRelationalTransform} provides relational transform support for
* wrangler directives.
*/
public interface DirectiveRelationalTransform extends LinearRelationalTransform {

/**
* Implementation of linear relational transform for each supported directive.
*
* @param relationalTranformContext transformation context with engine, input and output parameters
* @param relation input relation upon which the transformation is applied.
* @return transformed relation as the output relation. By default, returns an Invalid relation
* for unsupported directives.
*/
default Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
return new InvalidRelation("SQL execution for the directive is currently not supported.");
}

/**
* Indicates whether the directive is supported by relational transformation or not.
*
* @return boolean value for the directive SQL support.
* By default, returns false, indicating that the directive is currently not supported.
*/
default boolean isSQLSupported() {
return false;
}

}
16 changes: 16 additions & 0 deletions wrangler-core/src/main/java/io/cdap/directives/column/Drop.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -88,4 +90,18 @@ public Mutation lineage() {
.drop(Many.of(columns))
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
for (String col: columns) {
relation = relation.dropColumn(col);
}
return relation;
}

@Override
public boolean isSQLSupported() {
return true;
}
}
136 changes: 106 additions & 30 deletions wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.cdap.cdap.etl.api.relational.Expression;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType;
Expand All @@ -50,9 +49,11 @@
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveLoadException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.DirectiveRelationalTransform;
import io.cdap.wrangler.api.EntityCountMetric;
import io.cdap.wrangler.api.ErrorRecord;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.RecipeException;
import io.cdap.wrangler.api.RecipeParser;
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.RecipeSymbol;
Expand Down Expand Up @@ -103,7 +104,8 @@
@Plugin(type = "transform")
@Name("Wrangler")
@Description("Wrangler - A interactive tool for data cleansing and transformation.")
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> implements LinearRelationalTransform {
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> implements DirectiveRelationalTransform {

private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class);

// Configuration specifying the dataprep application and service name.
Expand All @@ -124,6 +126,12 @@ public class Wrangler extends Transform<StructuredRecord, StructuredRecord> impl
private static final String PRECONDITION_LANGUAGE_JEXL = "jexl";
private static final String PRECONDITION_LANGUAGE_SQL = "sql";

// Sql execution value
private static final String SQL_ENABLED = "yes";

// wrangler sql execution mode enabled or not
public boolean isSqlEnabled = false;

// Plugin configuration.
private final Config config;

Expand Down Expand Up @@ -172,7 +180,7 @@ public void configurePipeline(PipelineConfigurer configurer) {
try {
Schema iSchema = configurer.getStageConfigurer().getInputSchema();
if (!config.containsMacro(Config.NAME_FIELD) && !(config.getField().equals("*")
|| config.getField().equals("#"))) {
|| config.getField().equals("#"))) {
validateInputSchema(iSchema, collector);
}

Expand All @@ -185,12 +193,22 @@ public void configurePipeline(PipelineConfigurer configurer) {
}
}

if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) {
harshdeeppruthi marked this conversation as resolved.
Show resolved Hide resolved
isSqlEnabled = checkSQLExecutionEnabled(config);
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)
|| !config.containsMacro(Config.NAME_SQL_EXECUTION)) {
if (!config.containsMacro(Config.NAME_SQL_EXECUTION)) {
if (isSqlEnabled && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())) {
collector.addFailure("JEXL Precondition is not supported when SQL execution is enabled.", null);
}
if (!isSqlEnabled && PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) {
collector.addFailure("SQL Precondition is only supported when SQL execution is enabled.", null);
}
}
if (isSqlEnabled || PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) {
if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) {
validatePrecondition(config.getPreconditionSQL(), true, collector);
}
validateSQLModeDirectives(collector);
validateSQLUDDs(collector);
} else {
if (!config.containsMacro(Config.NAME_PRECONDITION)) {
validatePrecondition(config.getPreconditionJEXL(), false, collector);
Expand Down Expand Up @@ -248,6 +266,12 @@ public void configurePipeline(PipelineConfigurer configurer) {
collector.addFailure(e.getMessage(), null);
}

// If SQL Execution is enabled, check that the directives supports sql execution
if (isSqlEnabled) {
List<Directive> sqlDirectives = getDirectivesList(config);
validateSQLModeDirectives(collector, sqlDirectives);
}

// Based on the configuration create output schema.
try {
if (!config.containsMacro(Config.NAME_SCHEMA)) {
Expand All @@ -259,7 +283,7 @@ public void configurePipeline(PipelineConfigurer configurer) {
}

// Check if jexl pre-condition is not null or empty and if so compile expression.
if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (!isSqlEnabled && !config.containsMacro(Config.NAME_PRECONDITION)) {
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
try {
Expand Down Expand Up @@ -352,7 +376,7 @@ public void initialize(TransformContext context) throws Exception {
}

// Check if jexl pre-condition is not null or empty and if so compile expression.
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (!isSqlEnabled && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
try {
Expand Down Expand Up @@ -411,7 +435,7 @@ public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)
}

// If pre-condition is set, then evaluate the precondition
if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
if (!isSqlEnabled && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(false)) {
boolean skip = condition.apply(row);
if (skip) {
Expand Down Expand Up @@ -520,12 +544,17 @@ private void validatePrecondition(String precondition, Boolean isConditionSQL, F
}
}

private void validateSQLModeDirectives(FailureCollector collector) {
if (!Strings.isNullOrEmpty(config.getDirectives())) {
collector.addFailure("Directives are not supported for precondition of type SQL", null)
.withConfigProperty(Config.NAME_DIRECTIVES);
private void validateSQLModeDirectives(FailureCollector collector, List<Directive> directives) {
for (Directive directive : directives) {
if (!directive.isSQLSupported()) {
collector.addFailure(String.format("%s directive is not supported by SQL execution.",
directive.define().getDirectiveName()), null)
.withConfigProperty(Config.NAME_DIRECTIVES);
}
}
}

private void validateSQLUDDs(FailureCollector collector) {
if (!Strings.isNullOrEmpty(config.getUDDs())) {
collector.addFailure("UDDs are not supported for precondition of type SQL", null)
.withConfigProperty(Config.NAME_UDD);
Expand All @@ -544,6 +573,23 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) {
return false;
}

private boolean checkSQLExecutionEnabled(Config config) {
if (!Feature.WRANGLER_EXECUTION_SQL.isEnabled(getContext())) {
return false;
}

return SQL_ENABLED.equalsIgnoreCase(config.getSqlExecution());
}

List<Directive> getDirectivesList(Config config) throws DirectiveParseException, RecipeException {
String recipe = config.getDirectives();
List<Directive> directives = null;
GrammarBasedParser parser = new GrammarBasedParser("default",
new MigrateToV2(recipe).migrate(), registry);
directives = parser.parse();
return directives;
}

/**
* This method creates a {@link CompositeDirectiveRegistry} and initializes the {@link RecipeParser}
* with {@link NoOpDirectiveContext}
Expand All @@ -569,23 +615,46 @@ private RecipeParser getRecipeParser(StageContext context)

@Override
public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) {
if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())
&& checkPreconditionNotEmpty(true)) {
isSqlEnabled = checkSQLExecutionEnabled(config);
if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)
&& !Feature.WRANGLER_EXECUTION_SQL.isEnabled(relationalTranformContext)) {
return new InvalidRelation("Plugin is not configured for relational transformation");
}

if (Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) {
if ((isSqlEnabled || PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage()))
&& checkPreconditionNotEmpty(true)) {
Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}
Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL());
relation = relation.filter(filterExpression);
}
}

if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) {
throw new RuntimeException("SQL Precondition feature is not available");
if (isSqlEnabled) {
registry = SystemDirectiveRegistry.INSTANCE;
try {
registry.reload("default");
} catch (DirectiveLoadException e) {
throw new RuntimeException(e);
}

Optional<ExpressionFactory<String>> expressionFactory = getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
List<Directive> directives = null;
try {
directives = getDirectivesList(config);
} catch (DirectiveParseException | RecipeException e) {
throw new RuntimeException(e);
}

Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL());
return relation.filter(filterExpression);
for (Directive directive : directives) {
relation = directive
.transform(relationalTranformContext, relation);
}
}

return new InvalidRelation("Plugin is not configured for relational transformation");
return relation;
}

private Optional<ExpressionFactory<String>> getExpressionFactory(RelationalTranformContext ctx) {
Expand Down Expand Up @@ -637,6 +706,8 @@ private Map<String, String> getEntityMetricTags(EntityCountMetric metricDef) {
* Config for the plugin.
*/
public static class Config extends PluginConfig {

static final String NAME_SQL_EXECUTION = "sqlExecution";
static final String NAME_PRECONDITION = "precondition";
static final String NAME_PRECONDITION_SQL = "preconditionSQL";
static final String NAME_PRECONDITION_LANGUAGE = "expressionLanguage";
Expand All @@ -646,6 +717,11 @@ public static class Config extends PluginConfig {
static final String NAME_SCHEMA = "schema";
static final String NAME_ON_ERROR = "on-error";

@Name(NAME_SQL_EXECUTION)
@Description("Toggle to configure execution language between JEXL and SQL")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JEXL is a technical term that many users won't understand. It's also not technically correct, as many directives are direct java implementations that don't use jexl scripting. Not sure what would be a better term though. Native? Default?

@Macro
@Nullable
private String sqlExecution;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a newline after this line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to name this something like 'executionMode' and have an internal Enum for it. This is more extensible than a boolean toggle for SQL or no SQL, as it allows us to add new modes in the future if needed. (As a side note, when a property is meant to be a toggle, it should be of Boolean type, not String type).

For example, we may eventually want to add an 'auto' mode that lets the engine choose, or maybe some BigQuery specific SQL mode or something like that.

@Name(NAME_PRECONDITION_LANGUAGE)
@Description("Toggle to configure precondition language between JEXL and SQL")
@Macro
Expand Down Expand Up @@ -693,8 +769,9 @@ public static class Config extends PluginConfig {
@Nullable
private final String onError;

public Config(String preconditionLanguage, String precondition, String directives, String udds,
String field, String schema, String onError) {
public Config(String sqlExecution, String preconditionLanguage, String precondition, String directives, String udds,
String field, String schema, String onError) {
this.sqlExecution = sqlExecution;
this.preconditionLanguage = preconditionLanguage;
this.precondition = precondition;
this.directives = directives;
Expand All @@ -713,13 +790,13 @@ public String getOnError() {
}

public String getPreconditionLanguage() {
if (Strings.isNullOrEmpty(preconditionLanguage)) {
// due to backward compatibility...
return PRECONDITION_LANGUAGE_JEXL;
}
return preconditionLanguage;
}

public String getSqlExecution() {
return sqlExecution;
}

public String getPreconditionJEXL() {
return precondition;
}
Expand All @@ -741,4 +818,3 @@ public String getUDDs() {
}
}
}

Loading