From d3befb610542663d1ca88b312e4de2eea05a7689 Mon Sep 17 00:00:00 2001 From: minurajeeve Date: Fri, 24 Nov 2023 11:58:51 +0530 Subject: [PATCH] modifying execute API to get column nullability state --- .../wrangler/api/NullHandlingException.java | 12 ++++++ .../executor/RecipePipelineExecutor.java | 8 +++- .../v2/DirectiveExecutionRequest.java | 12 +++++- .../directive/AbstractDirectiveHandler.java | 37 +++++++++++++++++-- .../service/directive/WorkspaceHandler.java | 21 ++++++++++- 5 files changed, 84 insertions(+), 6 deletions(-) create mode 100644 wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java new file mode 100644 index 000000000..8665601fc --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java @@ -0,0 +1,12 @@ +package io.cdap.wrangler.api; + +public class NullHandlingException extends Exception { + public NullHandlingException(Exception e) { + super(e); + } + + public NullHandlingException(String message) { + super(message); + } + +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java index 159d6d512..a09301f5a 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java @@ -30,6 +30,7 @@ import io.cdap.wrangler.api.ReportErrorAndProceed; import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.TransientVariableScope; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator; import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext; import io.cdap.wrangler.schema.TransientStoreKeys; @@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; @@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline directives; + private HashMap nullabilityMap; - public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) { + public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context, + HashMap nullabilityMap) { this.context = context; this.recipeParser = recipeParser; + this.nullabilityMap = nullabilityMap; } /** diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java index a77ce1f5d..ffd96e946 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java @@ -17,7 +17,9 @@ package io.cdap.wrangler.proto.workspace.v2; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import java.util.Collections; +import java.util.HashMap; import java.util.List; /** @@ -26,10 +28,14 @@ public class DirectiveExecutionRequest { private final List directives; private final int limit; + private final HashMap nullabilityMap; - public DirectiveExecutionRequest(List directives, int limit) { + + public DirectiveExecutionRequest(List directives, int limit, + HashMap nullabilityMap) { this.directives = directives; this.limit = limit; + this.nullabilityMap = nullabilityMap; } public int getLimit() { @@ -39,4 +45,8 @@ public int getLimit() { public List getDirectives() { return directives == null ? Collections.emptyList() : directives; } + + public HashMap getNullabilityMap() { + return nullabilityMap; + } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java index d7eae7960..045c2f2fb 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java @@ -46,6 +46,9 @@ import io.cdap.wrangler.proto.workspace.ColumnValidationResult; import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult; import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse; +import io.cdap.wrangler.proto.workspace.v2.SampleSpec; +import io.cdap.wrangler.proto.workspace.v2.Workspace; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.DirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; @@ -118,7 +121,11 @@ protected List executeDirectives( String namespace, List directives, List sample, - GrammarWalker.Visitor grammarVisitor) throws DirectiveParseException, E, RecipeException { + GrammarWalker.Visitor grammarVisitor, + Workspace workspace) throws DirectiveParseException, E, RecipeException { + + HashMap nullabilityMap = workspace.getColumnMappings().isEmpty() ? + new HashMap<>() : workspace.getColumnMappings(); if (directives.isEmpty()) { return sample; @@ -139,8 +146,11 @@ protected List executeDirectives( new ConfigDirectiveContext(DirectiveConfig.EMPTY)); try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser, new ServicePipelineContext( - namespace, ExecutorContext.Environment.SERVICE, - getContext(), TRANSIENT_STORE))) { + namespace, + ExecutorContext.Environment.SERVICE, + getContext(), + TRANSIENT_STORE), + nullabilityMap)) { List result = executor.execute(sample); List errors = executor.errors() @@ -281,4 +291,25 @@ private String getColumnDisplayType(Schema schema) { type = type.substring(0, 1).toUpperCase() + type.substring(1).toLowerCase(); return type; } + +// public Row nullHandler (Row inputRow, String columnName) { +// Schema currentSchema = TRANSIENT_STORE.get(TransientStoreKeys.OUTPUT_SCHEMA) != null ? +// TRANSIENT_STORE.get(TransientStoreKeys.OUTPUT_SCHEMA) +// : TRANSIENT_STORE.get(TransientStoreKeys.INPUT_SCHEMA); +// UserDefinedAction userDefinedAction = currentSchema.getField(columnName).getUserDefinedAction(); +// Row outputRow = inputRow; +// +// switch (userDefinedAction) { +// case NO_ACTION: +// return outputRow; +// +// case SKIP_ROW: +// return null; //check if the row is empty in the directive +//// case SEND_TO_ERROR_COLLECTOR: +//// case ERROR_PIPELINE: +//// case NULLABLE: +// } +// +// return inputRow; +// } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index 82b45521e..bb39790ea 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -67,6 +67,7 @@ import io.cdap.wrangler.proto.workspace.v2.ServiceResponse; import io.cdap.wrangler.proto.workspace.v2.StageSpec; import io.cdap.wrangler.proto.workspace.v2.Workspace; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest; import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail; import io.cdap.wrangler.proto.workspace.v2.WorkspaceId; @@ -472,6 +473,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId); UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector(); + HashMap nullabilityMap = executionRequest.getNullabilityMap() == null ? + new HashMap<>() : executionRequest.getNullabilityMap(); + if (!nullabilityMap.isEmpty()) { + //change nullabilityMap in Workspace Object + changeNullability(nullabilityMap, workspaceId); + } List result = executeDirectives(ns.getName(), directives, detail, userDirectivesCollector); DirectiveExecutionResponse response = generateExecutionResponse(result, @@ -484,6 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque return response; } + private void changeNullability(HashMap columnMappings, + WorkspaceId workspaceId) throws Exception { + try { + Workspace workspace = wsStore.getWorkspace(workspaceId); + workspace.setColumnMappings(columnMappings); + wsStore.updateWorkspace(workspaceId, workspace); + } catch (Exception e) { + throw new RuntimeException("Error in setting nullabilityMap of columns ", e); + } + } + + /** * Get source specs, contains some hacky way on dealing with the csv parser */ @@ -580,7 +599,7 @@ private List executeLocally(String namespace, List(detail.getSample()), - grammarVisitor); + grammarVisitor, detail.getWorkspace()); } /**