Skip to content

Commit

Permalink
[BUGFIX] Fix custom function cannot use in returns() bug (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
tammypi authored and haoch committed Jan 11, 2019
1 parent 979c667 commit 04e0919
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public <T extends Tuple> DataStream<T> returns(String outStreamId) {
siddhiContext.setExtensions(environment.getExtensions());
siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
TypeInformation<T> typeInformation =
SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getAllEnrichedExecutionPlan(), outStreamId);
SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getAllEnrichedExecutionPlan(), outStreamId, siddhiContext);
siddhiContext.setOutputStreamType(typeInformation);
return returnsInternal(siddhiContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
* Siddhi Type Utils for conversion between Java Type, Siddhi Field Type, Stream Definition, and Flink Type Information.
Expand Down Expand Up @@ -84,6 +82,34 @@ public static AbstractDefinition getStreamDefinition(String executionPlan, Strin
}
}

public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId, SiddhiOperatorContext siddhiOperatorContext) {
SiddhiManager siddhiManager = null;
SiddhiAppRuntime runtime = null;
try {
siddhiManager = new SiddhiManager();
Map extensions = siddhiOperatorContext.getExtensions();
Iterator<Map.Entry<String,Class<?>>> iterator = extensions.entrySet().iterator();
while(iterator.hasNext()){
Map.Entry<String,Class<?>> entry = iterator.next();
siddhiManager.setExtension(entry.getKey(), entry.getValue());
}
runtime = siddhiManager.createSiddhiAppRuntime(executionPlan);
Map<String, StreamDefinition> definitionMap = runtime.getStreamDefinitionMap();
if (definitionMap.containsKey(streamId)) {
return definitionMap.get(streamId);
} else {
throw new IllegalArgumentException("Unknown stream id" + streamId);
}
} finally {
if (runtime != null) {
runtime.shutdown();
}
if (siddhiManager != null) {
siddhiManager.shutdown();
}
}
}

public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(AbstractDefinition definition) {
List<TypeInformation> types = new ArrayList<>();
for (Attribute attribute : definition.getAttributeList()) {
Expand All @@ -100,6 +126,10 @@ public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(Strin
return getTupleTypeInformation(getStreamDefinition(executionPlan, streamId));
}

public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(String executionPlan, String streamId, SiddhiOperatorContext siddhiOperatorContext) {
return getTupleTypeInformation(getStreamDefinition(executionPlan, streamId,siddhiOperatorContext));
}

@SuppressWarnings("unchecked")
private static final TypeInformation<GenericRecord> MAP_PROXY_TYPE_INFORMATION = TypeExtractor.createTypeInfo(GenericRecord.class);

Expand Down

0 comments on commit 04e0919

Please sign in to comment.