Skip to content

Commit

Permalink
Upgrade version of flink and siddhi (#18 by @wujinhu)
Browse files Browse the repository at this point in the history
* upgrade version of flink and siddhi

* upgrade version of flink and siddhi
  • Loading branch information
wujinhu authored and haoch committed Dec 8, 2018
1 parent 0abcc47 commit 2fd2114
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.siddhi.control.ControlEvent;
import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
Expand Down Expand Up @@ -128,7 +128,7 @@ public ExecutionSiddhiStream cql(DataStream<ControlEvent> controlStream) {
private static class NamedControlStream
implements MapFunction<ControlEvent, Tuple2<String, Object>>, ResultTypeQueryable {
private static final TypeInformation<Tuple2<String, Object>> TYPE_INFO =
TypeInfoParser.parse("Tuple2<java.lang.String, java.lang.Object>");
TypeInformation.of(new TypeHint<Tuple2<String, Object>>(){});
private final String streamId;

NamedControlStream(String streamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import java.io.IOException;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ControlEventSchema extends AbstractDeserializationSchema<ControlEvent> {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.apache.flink.streaming.siddhi.control;

import com.fasterxml.jackson.annotation.JsonRawValue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonRawValue;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

public class ControlMessage{
private final String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.siddhi.utils.GenericRecord;
Expand All @@ -33,8 +35,6 @@
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.flink.streaming.siddhi.utils;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
Expand Down Expand Up @@ -86,20 +85,14 @@ public static AbstractDefinition getStreamDefinition(String executionPlan, Strin
}

public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(AbstractDefinition definition) {
int tupleSize = definition.getAttributeList().size();
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("Tuple").append(tupleSize);
stringBuilder.append("<");
List<String> attributeTypes = new ArrayList<>();
List<TypeInformation> types = new ArrayList<>();
for (Attribute attribute : definition.getAttributeList()) {
attributeTypes.add(getJavaType(attribute.getType()).getName());
types.add(TypeInformation.of(getJavaType(attribute.getType())));
}
stringBuilder.append(StringUtils.join(attributeTypes, ","));
stringBuilder.append(">");
try {
return TypeInfoParser.parse(stringBuilder.toString());
return Types.TUPLE(types.toArray(new TypeInformation[0]));
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException("Unable to parse " + stringBuilder.toString(), ex);
throw new IllegalArgumentException("Unable to parse ", ex);
}
}

Expand Down Expand Up @@ -130,6 +123,6 @@ public static Class<?> getJavaType(Attribute.Type attributeType) {
}

public static <T> TypeInformation<Tuple2<String, T>> getStreamTupleTypeInformation(TypeInformation<T> typeInformation) {
return TypeInfoParser.parse("Tuple2<String," + typeInformation.getTypeClass().getName() + ">");
return Types.TUPLE(Types.STRING, typeInformation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.siddhi.control.ControlEvent;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Rule;
Expand All @@ -58,8 +58,7 @@
/**
* Flink-siddhi library integration test cases
*/
public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable {

public class SiddhiCEPITCase extends AbstractTestBase implements Serializable {
@Rule
public transient TemporaryFolder tempFolder = new TemporaryFolder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

package org.apache.flink.streaming.siddhi.schema;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.siddhi.source.Event;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class StreamSchemaTest {

@Test
public void testStreamSchemaWithPojo() {
TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class);
Expand All @@ -41,16 +43,16 @@ public void testStreamSchemaWithPojo() {

@Test
public void testStreamSchemaWithTuple() {
TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
TypeInformation<Tuple4<Integer,Long,String,Double>> typeInfo = TypeInformation.of(new TypeHint<Tuple4<Integer,Long,String,Double>>() {});
StreamSchema<Tuple4<Integer,Long,String,Double>> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
assertEquals(4, schema.getFieldIndexes().length);
assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
}

@Test
public void testStreamSchemaWithPrimitive() {
TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
TypeInformation<String> typeInfo = TypeInformation.of(new TypeHint<String>() {});
StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
assertEquals(String.class, schema.getTypeInfo().getTypeClass());
assertEquals(1, schema.getFieldIndexes().length);
Expand All @@ -70,25 +72,25 @@ public void testStreamTupleSerializerWithPojo() {
StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
assertEquals(Event.class, schema.getTypeInfo().getTypeClass());

TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
assertEquals("Java Tuple2<String, GenericType<" + Event.class.getName() + ">>", tuple2TypeInformation.toString());
TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = Types.TUPLE(TypeInformation.of(String.class), schema.getTypeInfo());
assertEquals("Java Tuple2<String, PojoType<org.apache.flink.streaming.siddhi.source.Event, fields = [id: Integer, name: String, price: Double, timestamp: Long]>>", tuple2TypeInformation.toString());
}

@Test
public void testStreamTupleSerializerWithTuple() {
TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
TypeInformation<Tuple4<Integer,Long,String,Double>> typeInfo = TypeInformation.of(new TypeHint<Tuple4<Integer,Long,String,Double>>() {});
StreamSchema<Tuple4<Integer,Long,String,Double>> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
assertEquals("Java Tuple2<String, GenericType<" + Tuple4.class.getName() + ">>", tuple2TypeInformation.toString());
TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = Types.TUPLE(TypeInformation.of(String.class), schema.getTypeInfo());
assertEquals("Java Tuple2<String, Java Tuple4<Integer, Long, String, Double>>", tuple2TypeInformation.toString());
}

@Test
public void testStreamTupleSerializerWithPrimitive() {
TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
TypeInformation<String> typeInfo = TypeInformation.of(new TypeHint<String>() {});
StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
assertEquals(String.class, schema.getTypeInfo().getTypeClass());
TypeInformation<Tuple2<String, String>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">");
TypeInformation<Tuple2<String, String>> tuple2TypeInformation = Types.TUPLE(TypeInformation.of(String.class), schema.getTypeInfo());
assertEquals("Java Tuple2<String, String>", tuple2TypeInformation.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,10 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.junit.Assert;
import org.junit.Test;

public class SiddhiTypeFactoryTest {
@Test
public void testTypeInfoParser() {
TypeInformation<Tuple3<String, Long, Object>> type1 = TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>");
Assert.assertNotNull(type1);
TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">");
Assert.assertNotNull(type2);
}

public static class InnerPojo {
}

@Test
public void testBuildTypeInformationForSiddhiStream() {
String query = "define stream inputStream (timestamp long, name string, value double);"
Expand Down
5 changes: 5 additions & 0 deletions experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ under the License.
<packaging>pom</packaging>

<properties>
<siddhi.version>4.0.0-M120</siddhi.version>
<flink.version>1.3.0</flink.version>
<siddhi.version>4.2.40</siddhi.version>
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>

Expand Down

0 comments on commit 2fd2114

Please sign in to comment.