Skip to content

Commit

Permalink
modifying execute API to get column nullability state
Browse files Browse the repository at this point in the history
  • Loading branch information
minurajeeve committed Nov 28, 2023
1 parent 5790e4b commit d3befb6
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.cdap.wrangler.api;

public class NullHandlingException extends Exception {
public NullHandlingException(Exception e) {
super(e);
}

public NullHandlingException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.cdap.wrangler.api.ReportErrorAndProceed;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator;
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext;
import io.cdap.wrangler.schema.TransientStoreKeys;
Expand All @@ -40,6 +41,8 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;

Expand All @@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu
private final RecipeParser recipeParser;
private final ExecutorContext context;
private List<Directive> directives;
private HashMap<String, UserDefinedAction> nullabilityMap;

public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) {
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context,
HashMap<String, UserDefinedAction> nullabilityMap) {
this.context = context;
this.recipeParser = recipeParser;
this.nullabilityMap = nullabilityMap;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package io.cdap.wrangler.proto.workspace.v2;

import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

/**
Expand All @@ -26,10 +28,14 @@
public class DirectiveExecutionRequest {
private final List<String> directives;
private final int limit;
private final HashMap<String, UserDefinedAction> nullabilityMap;

public DirectiveExecutionRequest(List<String> directives, int limit) {

public DirectiveExecutionRequest(List<String> directives, int limit,
HashMap<String, UserDefinedAction> nullabilityMap) {
this.directives = directives;
this.limit = limit;
this.nullabilityMap = nullabilityMap;
}

public int getLimit() {
Expand All @@ -39,4 +45,8 @@ public int getLimit() {
public List<String> getDirectives() {
return directives == null ? Collections.emptyList() : directives;
}

public HashMap<String, UserDefinedAction> getNullabilityMap() {
return nullabilityMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import io.cdap.wrangler.proto.workspace.ColumnValidationResult;
import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult;
import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse;
import io.cdap.wrangler.proto.workspace.v2.SampleSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
Expand Down Expand Up @@ -118,7 +121,11 @@ protected <E extends Exception> List<Row> executeDirectives(
String namespace,
List<String> directives,
List<Row> sample,
GrammarWalker.Visitor<E> grammarVisitor) throws DirectiveParseException, E, RecipeException {
GrammarWalker.Visitor<E> grammarVisitor,
Workspace workspace) throws DirectiveParseException, E, RecipeException {

HashMap<String, UserDefinedAction> nullabilityMap = workspace.getColumnMappings().isEmpty() ?
new HashMap<>() : workspace.getColumnMappings();

if (directives.isEmpty()) {
return sample;
Expand All @@ -139,8 +146,11 @@ protected <E extends Exception> List<Row> executeDirectives(
new ConfigDirectiveContext(DirectiveConfig.EMPTY));
try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser,
new ServicePipelineContext(
namespace, ExecutorContext.Environment.SERVICE,
getContext(), TRANSIENT_STORE))) {
namespace,
ExecutorContext.Environment.SERVICE,
getContext(),
TRANSIENT_STORE),
nullabilityMap)) {
List<Row> result = executor.execute(sample);

List<ErrorRecordBase> errors = executor.errors()
Expand Down Expand Up @@ -281,4 +291,25 @@ private String getColumnDisplayType(Schema schema) {
type = type.substring(0, 1).toUpperCase() + type.substring(1).toLowerCase();
return type;
}

// public Row nullHandler (Row inputRow, String columnName) {
// Schema currentSchema = TRANSIENT_STORE.get(TransientStoreKeys.OUTPUT_SCHEMA) != null ?
// TRANSIENT_STORE.get(TransientStoreKeys.OUTPUT_SCHEMA)
// : TRANSIENT_STORE.get(TransientStoreKeys.INPUT_SCHEMA);
// UserDefinedAction userDefinedAction = currentSchema.getField(columnName).getUserDefinedAction();
// Row outputRow = inputRow;
//
// switch (userDefinedAction) {
// case NO_ACTION:
// return outputRow;
//
// case SKIP_ROW:
// return null; //check if the row is empty in the directive
//// case SEND_TO_ERROR_COLLECTOR:
//// case ERROR_PIPELINE:
//// case NULLABLE:
// }
//
// return inputRow;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.cdap.wrangler.proto.workspace.v2.ServiceResponse;
import io.cdap.wrangler.proto.workspace.v2.StageSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceId;
Expand Down Expand Up @@ -472,6 +473,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque

WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId);
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector();
HashMap<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap() == null ?
new HashMap<>() : executionRequest.getNullabilityMap();
if (!nullabilityMap.isEmpty()) {
//change nullabilityMap in Workspace Object
changeNullability(nullabilityMap, workspaceId);
}
List<Row> result = executeDirectives(ns.getName(), directives, detail,
userDirectivesCollector);
DirectiveExecutionResponse response = generateExecutionResponse(result,
Expand All @@ -484,6 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque
return response;
}

private void changeNullability(HashMap<String, UserDefinedAction> columnMappings,
WorkspaceId workspaceId) throws Exception {
try {
Workspace workspace = wsStore.getWorkspace(workspaceId);
workspace.setColumnMappings(columnMappings);
wsStore.updateWorkspace(workspaceId, workspace);
} catch (Exception e) {
throw new RuntimeException("Error in setting nullabilityMap of columns ", e);
}
}


/**
* Get source specs, contains some hacky way on dealing with the csv parser
*/
Expand Down Expand Up @@ -580,7 +599,7 @@ private <E extends Exception> List<Row> executeLocally(String namespace, List<St
// load the udd
composite.reload(namespace);
return executeDirectives(namespace, directives, new ArrayList<>(detail.getSample()),
grammarVisitor);
grammarVisitor, detail.getWorkspace());
}

/**
Expand Down

0 comments on commit d3befb6

Please sign in to comment.