diff --git a/connectors/connector-kafka-0.10/pom.xml b/connectors/connector-kafka-0.10/pom.xml
index 8f614f1e5..7138c9a64 100644
--- a/connectors/connector-kafka-0.10/pom.xml
+++ b/connectors/connector-kafka-0.10/pom.xml
@@ -5,7 +5,7 @@
alink_connectors
com.alibaba.alink
- 1.1-SNAPSHOT
+ 1.1.0
4.0.0
diff --git a/connectors/connector-kafka-0.11/pom.xml b/connectors/connector-kafka-0.11/pom.xml
index faf6bebc8..4003d7ec4 100644
--- a/connectors/connector-kafka-0.11/pom.xml
+++ b/connectors/connector-kafka-0.11/pom.xml
@@ -5,7 +5,7 @@
alink_connectors
com.alibaba.alink
- 1.1-SNAPSHOT
+ 1.1.0
4.0.0
diff --git a/connectors/connector-kafka-base/pom.xml b/connectors/connector-kafka-base/pom.xml
index 3a5d5b654..b7eabf62b 100644
--- a/connectors/connector-kafka-base/pom.xml
+++ b/connectors/connector-kafka-base/pom.xml
@@ -5,7 +5,7 @@
alink_connectors
com.alibaba.alink
- 1.1-SNAPSHOT
+ 1.1.0
4.0.0
diff --git a/connectors/connector-kafka/pom.xml b/connectors/connector-kafka/pom.xml
index 8a91604e6..ec91748eb 100644
--- a/connectors/connector-kafka/pom.xml
+++ b/connectors/connector-kafka/pom.xml
@@ -5,7 +5,7 @@
alink_connectors
com.alibaba.alink
- 1.1-SNAPSHOT
+ 1.1.0
4.0.0
diff --git a/connectors/connector-pravega/pom.xml b/connectors/connector-pravega/pom.xml
new file mode 100644
index 000000000..d4322e78f
--- /dev/null
+++ b/connectors/connector-pravega/pom.xml
@@ -0,0 +1,62 @@
+
+
+
+ alink_connectors
+ com.alibaba.alink
+ 1.1.0
+
+ 4.0.0
+
+
+ alink_connectors_pravega_flink-${alink.flink.major.version}_${alink.scala.major.version}
+ alink-connector-pravega
+
+ jar
+
+
+
+ com.alibaba.alink
+ alink_core_flink-${alink.flink.major.version}_${alink.scala.major.version}
+ ${project.version}
+ provided
+
+
+ org.apache.flink
+ flink-streaming-java_${alink.scala.major.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-streaming-scala_${alink.scala.major.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-api-java-bridge_${alink.scala.major.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-planner_${alink.scala.major.version}
+ ${flink.version}
+ provided
+
+
+
+ io.pravega
+ pravega-connectors-flink-${alink.flink.major.version}_2.12
+ ${pravega.version}
+
+
+
\ No newline at end of file
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/sink/PravegaSinkBatchOp.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/sink/PravegaSinkBatchOp.java
new file mode 100644
index 000000000..8ecc28c1d
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/sink/PravegaSinkBatchOp.java
@@ -0,0 +1,103 @@
+package com.alibaba.alink.operator.batch.sink;
+
+
+import com.alibaba.alink.common.MLEnvironmentFactory;
+import com.alibaba.alink.common.io.annotations.AnnotationUtils;
+import com.alibaba.alink.common.io.annotations.IOType;
+import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
+import com.alibaba.alink.operator.batch.BatchOperator;
+import com.alibaba.alink.operator.common.io.csv.CsvUtil;
+import com.alibaba.alink.operator.common.io.pravega.PravegaCsvRowSerializationSchema;
+import com.alibaba.alink.operator.common.io.pravega.PravegaJsonRowSerializationSchema;
+import com.alibaba.alink.params.io.PravegaSinkParams;
+import io.pravega.client.admin.StreamManager;
+import io.pravega.client.stream.Stream;
+import io.pravega.client.stream.StreamConfiguration;
+import io.pravega.connectors.flink.FlinkPravegaOutputFormat;
+import io.pravega.connectors.flink.PravegaConfig;
+import io.pravega.connectors.flink.PravegaEventRouter;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.types.Row;
+
+import java.net.URI;
+import java.util.Collection;
+
+
+@IoOpAnnotation(name = "pravega", ioType = IOType.SinkBatch)
+public class PravegaSinkBatchOp extends BaseSinkBatchOp
+ implements PravegaSinkParams {
+
+
+ public PravegaSinkBatchOp() {
+ this(new Params());
+ }
+
+
+ public PravegaSinkBatchOp(Params params) {
+ super(AnnotationUtils.annotatedName(PravegaSinkBatchOp.class), params);
+ }
+
+ @Override
+ protected PravegaSinkBatchOp sinkFrom(BatchOperator in) {
+ final String pravegaControllerUri = getPravegaControllerUri();
+ final String pravegaScope = getPravegaScope();
+ final String pravegaStream = getPravegaStream();
+ final Boolean PravegaStandalone = getPravegaStandalone();
+ final String dataFormat = getDataFormat();
+ final String[] colNames = in.getColNames();
+ final String fieldDelimiter = CsvUtil.unEscape(getFieldDelimiter());
+
+ System.out.println(in.collect().toString());
+
+ PravegaEventRouter router = new PravegaEventRouter() {
+ @Override
+ public String getRoutingKey(Row eventType) {
+ final String pravegaRoutingKey = getPravegaRoutingKey();
+ return pravegaRoutingKey;
+ }
+ };
+
+ PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
+ .withControllerURI(URI.create(pravegaControllerUri))
+ .withDefaultScope(pravegaScope)
+ //Enable it if with Nautilus
+ //.withCredentials(credentials)
+ .withHostnameValidation(false);
+
+
+ //Create Pravega Scope and Stream
+ Stream stream = pravegaConfig.resolve(pravegaStream);
+ try (StreamManager streamManager = StreamManager.create(pravegaConfig.getClientConfig())) {
+ // create the requested scope (if standalone)
+ if (PravegaStandalone) {
+ streamManager.createScope(pravegaScope);
+ }
+ // create the requested stream based on the given stream configuration
+ streamManager.createStream(stream.getScope(), stream.getStreamName(), StreamConfiguration.builder().build());
+ }
+
+ SerializationSchema serializationSchema = null;
+ //serializationSchema = new JsonRowSerializationSchema(colNames);
+ if (dataFormat.equalsIgnoreCase("csv")) {
+ serializationSchema = new PravegaCsvRowSerializationSchema(fieldDelimiter);
+ } else if (dataFormat.equalsIgnoreCase("json")) {
+ serializationSchema = new PravegaJsonRowSerializationSchema(colNames);
+ }
+
+
+ FlinkPravegaOutputFormat outputFormat = FlinkPravegaOutputFormat.builder()
+ .forStream(pravegaStream)
+ .withPravegaConfig(pravegaConfig)
+ .withSerializationSchema(serializationSchema)
+ .withEventRouter(router)
+ .build();
+
+ Collection inputData = in.collect();
+ ExecutionEnvironment env = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
+
+ env.fromCollection(inputData).output(outputFormat).name("pravega_" + pravegaScope + "/" + pravegaStream);
+ return this;
+ }
+}
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/source/PravegaSourceBatchOp.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/source/PravegaSourceBatchOp.java
new file mode 100644
index 000000000..e1aa16d71
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/source/PravegaSourceBatchOp.java
@@ -0,0 +1,77 @@
+package com.alibaba.alink.operator.batch.source;
+
+import com.alibaba.alink.common.MLEnvironmentFactory;
+import com.alibaba.alink.common.io.annotations.AnnotationUtils;
+import com.alibaba.alink.common.io.annotations.IOType;
+import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
+import com.alibaba.alink.common.utils.DataSetConversionUtil;
+import com.alibaba.alink.operator.common.io.csv.CsvUtil;
+import com.alibaba.alink.operator.common.io.pravega.PravegaRowDeserializationSchema;
+import com.alibaba.alink.params.io.PravegaSourceParams;
+import io.pravega.client.ClientConfig;
+import io.pravega.client.admin.StreamManager;
+import io.pravega.client.stream.ReaderGroup;
+import io.pravega.client.stream.StreamCut;
+import io.pravega.connectors.flink.FlinkPravegaInputFormat;
+import io.pravega.connectors.flink.PravegaConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+
+import java.net.URI;
+
+@IoOpAnnotation(name = "pravega", ioType = IOType.SourceBatch)
+public final class PravegaSourceBatchOp extends BaseSourceBatchOp
+ implements PravegaSourceParams {
+
+ public PravegaSourceBatchOp() {
+ this(new Params());
+ }
+
+
+ public PravegaSourceBatchOp(Params params) {
+ super(AnnotationUtils.annotatedName(PravegaSourceBatchOp.class), params);
+ }
+
+ @Override
+ protected Table initializeDataSource() {
+ String pravegaControllerUri = getPravegaControllerUri();
+ String pravegascope = getPravegaScope();
+ String pravegaStream = getPravegaStream();
+ StreamCut pravegaStartStreamCut = StreamCut.from(getPravegaStartStreamCut());
+ StreamCut pravegaEndStreamCut = StreamCut.from(getPravegaEndStreamCut());
+ String schemaStr = "event String";
+ //DeserializationSchema deserializationSchema = getPravegaDeserializer();
+ final String[] colNames = CsvUtil.getColNames(schemaStr);
+ final TypeInformation[] colTypes = CsvUtil.getColTypes(schemaStr);
+
+
+ //Properties props = new Properties();
+ //props.setProperty("pravegaControllerUri", pravegaControllerUri);
+ //props.setProperty("pravegascope", pravegascope);
+
+
+ PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
+ .withControllerURI(URI.create(pravegaControllerUri))
+ .withDefaultScope(pravegascope)
+ //Enable it if with Nautilus
+ //.withCredentials(credentials)
+ .withHostnameValidation(false);
+
+
+ FlinkPravegaInputFormat source = FlinkPravegaInputFormat.builder()
+ .withPravegaConfig(pravegaConfig)
+ .forStream(pravegaStream, pravegaStartStreamCut, pravegaEndStreamCut)
+ .withDeserializationSchema(new PravegaRowDeserializationSchema(Row.class))
+ .build();
+
+ ExecutionEnvironment execEnv = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
+
+ DataSource data = execEnv.createInput(source, TypeInformation.of(Row.class)).name("Pravega BatchReader");
+
+ return DataSetConversionUtil.toTable(getMLEnvironmentId(), data, colNames, colTypes);
+ }
+}
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaCsvRowSerializationSchema.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaCsvRowSerializationSchema.java
new file mode 100644
index 000000000..22b4b9422
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaCsvRowSerializationSchema.java
@@ -0,0 +1,36 @@
+package com.alibaba.alink.operator.common.io.pravega;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+
+
+public class PravegaCsvRowSerializationSchema implements SerializationSchema {
+
+ private final String fieldDelimiter;
+
+ public PravegaCsvRowSerializationSchema(String fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ }
+
+ @Override
+ public byte[] serialize(Row row) {
+ StringBuilder sbd = new StringBuilder();
+ int n = row.getArity();
+ for (int i = 0; i < n; i++) {
+ Object obj = row.getField(i);
+ if (obj != null) {
+ sbd.append(obj);
+ }
+ if (i != n - 1) {
+ sbd.append(this.fieldDelimiter);
+ }
+ }
+ String str = sbd.toString();
+
+ try {
+ return str.getBytes("UTF-8");
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to serialize row", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaJsonRowSerializationSchema.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaJsonRowSerializationSchema.java
new file mode 100644
index 000000000..838dcff4c
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaJsonRowSerializationSchema.java
@@ -0,0 +1,34 @@
+package com.alibaba.alink.operator.common.io.pravega;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+
+import static com.alibaba.alink.common.utils.JsonConverter.gson;
+
+public class PravegaJsonRowSerializationSchema implements SerializationSchema {
+
+ String[] colNames;
+
+ public PravegaJsonRowSerializationSchema(String[] colNames) {
+ this.colNames = colNames;
+ }
+
+ @Override
+ public byte[] serialize(Row row) {
+ HashMap map = new HashMap<>();
+ for (int i = 0; i < colNames.length; i++) {
+ Object obj = row.getField(i);
+ if (obj != null) {
+ map.put(colNames[i], obj);
+ }
+ }
+ String str = gson.toJson(map);
+ try {
+ return str.getBytes("UTF-8");
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to serialize row", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaRowDeserializationSchema.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaRowDeserializationSchema.java
new file mode 100644
index 000000000..355fbee76
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaRowDeserializationSchema.java
@@ -0,0 +1,34 @@
+package com.alibaba.alink.operator.common.io.pravega;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+public class PravegaRowDeserializationSchema implements DeserializationSchema {
+
+ private Class valueType;
+
+ public PravegaRowDeserializationSchema(Class valueType) {
+ this.valueType = valueType;
+ }
+
+ @Override
+ public T deserialize(byte[] event) throws IOException {
+ Row row = new Row(1);
+ row.setField(0, event != null ? new String(event, Charset.forName("UTF-8")) : null);
+ return (T) row;
+ }
+
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(valueType);
+ }
+}
\ No newline at end of file
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/sink/PravegaSinkStreamOp.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/sink/PravegaSinkStreamOp.java
new file mode 100644
index 000000000..36f504719
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/sink/PravegaSinkStreamOp.java
@@ -0,0 +1,116 @@
+package com.alibaba.alink.operator.stream.sink;
+
+
+import com.alibaba.alink.common.io.annotations.AnnotationUtils;
+import com.alibaba.alink.common.io.annotations.IOType;
+import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
+import com.alibaba.alink.operator.common.io.csv.CsvUtil;
+import com.alibaba.alink.operator.common.io.pravega.PravegaCsvRowSerializationSchema;
+import com.alibaba.alink.operator.common.io.pravega.PravegaJsonRowSerializationSchema;
+import com.alibaba.alink.operator.stream.StreamOperator;
+import com.alibaba.alink.params.io.PravegaSinkParams;
+import io.pravega.client.admin.StreamManager;
+import io.pravega.client.stream.Stream;
+import io.pravega.client.stream.StreamConfiguration;
+import io.pravega.connectors.flink.*;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+
+import java.net.URI;
+
+
+/**
+ * Data Sink for pravega.
+ */
+@IoOpAnnotation(name = "pravega", hasTimestamp = true, ioType = IOType.SinkStream)
+public class PravegaSinkStreamOp extends BaseSinkStreamOp
+ implements PravegaSinkParams {
+
+ public PravegaSinkStreamOp() {
+ this(new Params());
+ }
+
+
+ public PravegaSinkStreamOp(Params params) {
+ super(AnnotationUtils.annotatedName(PravegaSinkStreamOp.class), params);
+ }
+
+ @Override
+ protected PravegaSinkStreamOp sinkFrom(StreamOperator in) {
+ final String pravegaControllerUri = getPravegaControllerUri();
+ final String pravegaScope = getPravegaScope();
+ final String pravegaStream = getPravegaStream();
+ final Boolean PravegaStandalone = getPravegaStandalone();
+ final String dataFormat = getDataFormat();
+ final String[] colNames = in.getColNames();
+ final String fieldDelimiter = CsvUtil.unEscape(getFieldDelimiter());
+
+ System.out.println(in.getDataStream());
+
+ //Fetch the Pravega writer mode
+ PravegaWriterMode writerMode = null;
+ final String pravegawritemode = getPravegaWriterMode();
+ switch (pravegawritemode.toUpperCase()) {
+ case ("ATLEAST_ONCE"):
+ writerMode = PravegaWriterMode.ATLEAST_ONCE;
+ break;
+ case ("EXACTLY_ONCE"):
+ writerMode = PravegaWriterMode.EXACTLY_ONCE;
+ break;
+ case ("BEST_EFFORT"):
+ writerMode = PravegaWriterMode.BEST_EFFORT;
+ break;
+ }
+
+ //Generate the routing key
+ PravegaEventRouter router = new PravegaEventRouter() {
+ @Override
+ public String getRoutingKey(Row eventType) {
+ final String pravegaRoutingKey = getPravegaRoutingKey();
+ return pravegaRoutingKey;
+ }
+ };
+
+ PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
+ .withControllerURI(URI.create(pravegaControllerUri))
+ .withDefaultScope(pravegaScope)
+ //Enable it if with Nautilus
+ //.withCredentials(credentials)
+ .withHostnameValidation(false);
+
+
+ //Create Pravega Scope and Stream in case it does not exist.
+ Stream stream = pravegaConfig.resolve(pravegaStream);
+ try (StreamManager streamManager = StreamManager.create(pravegaConfig.getClientConfig())) {
+ // create the requested scope (if standalone)
+ if (PravegaStandalone) {
+ streamManager.createScope(pravegaScope);
+ }
+ // create the requested stream based on the given stream configuration
+ streamManager.createStream(stream.getScope(), stream.getStreamName(), StreamConfiguration.builder().build());
+ }
+
+ SerializationSchema serializationSchema = null;
+ //serializationSchema = new JsonRowSerializationSchema(colNames);
+ if (dataFormat.equalsIgnoreCase("csv")) {
+ serializationSchema = new PravegaCsvRowSerializationSchema(fieldDelimiter);
+ } else if (dataFormat.equalsIgnoreCase("json")) {
+ serializationSchema = new PravegaJsonRowSerializationSchema(colNames);
+ }
+
+ FlinkPravegaWriter pravegaSink = FlinkPravegaWriter.builder()
+ .forStream(pravegaStream)
+ .withPravegaConfig(pravegaConfig)
+ .withSerializationSchema(serializationSchema)
+ .withEventRouter(router)
+ .withWriterMode(writerMode)
+ .build();
+
+ DataStream inputData = in.getDataStream();
+ inputData.addSink(pravegaSink).name("pravega_" + pravegaScope + "/" + pravegaStream);
+
+ return this;
+ }
+}
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/source/PravegaSourceStreamOp.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/source/PravegaSourceStreamOp.java
new file mode 100644
index 000000000..3c07b93e3
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/source/PravegaSourceStreamOp.java
@@ -0,0 +1,87 @@
+package com.alibaba.alink.operator.stream.source;
+
+import com.alibaba.alink.common.MLEnvironmentFactory;
+import com.alibaba.alink.common.io.annotations.AnnotationUtils;
+import com.alibaba.alink.common.io.annotations.IOType;
+import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
+import com.alibaba.alink.common.utils.DataStreamConversionUtil;
+import com.alibaba.alink.operator.common.io.csv.CsvUtil;
+import com.alibaba.alink.operator.common.io.pravega.PravegaRowDeserializationSchema;
+import com.alibaba.alink.params.io.PravegaSourceParams;
+import io.pravega.client.ClientConfig;
+import io.pravega.client.admin.StreamManager;
+import io.pravega.client.stream.StreamCut;
+import io.pravega.connectors.flink.FlinkPravegaReader;
+import io.pravega.connectors.flink.PravegaConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+
+import java.net.URI;
+import java.util.Properties;
+
+/**
+ * Data source for pravega.
+ */
+@IoOpAnnotation(name = "pravega", hasTimestamp = true, ioType = IOType.SourceStream)
+public class PravegaSourceStreamOp extends BaseSourceStreamOp
+ implements PravegaSourceParams {
+
+ public PravegaSourceStreamOp() {
+ this(new Params());
+ }
+
+
+ public PravegaSourceStreamOp(Params params) {
+ super(AnnotationUtils.annotatedName(PravegaSourceStreamOp.class), params);
+ }
+
+ @Override
+ protected Table initializeDataSource() {
+ String pravegaControllerUri = getPravegaControllerUri();
+ String pravegascope = getPravegaScope();
+ String pravegaStream = getPravegaStream();
+ StreamCut pravegaStartStreamCut = null;
+ if (getPravegaStartStreamCut().equals("UNBOUNDED"))
+ {
+ ClientConfig clientConfig= ClientConfig.builder().controllerURI(URI.create(pravegaControllerUri)).build();
+ StreamManager streamManager = StreamManager.create(clientConfig);
+ pravegaStartStreamCut = streamManager.getStreamInfo(pravegascope,pravegaStream).getTailStreamCut();
+ } else {
+ pravegaStartStreamCut = StreamCut.from(getPravegaStartStreamCut());
+ }
+ StreamCut pravegaEndStreamCut = StreamCut.from(getPravegaEndStreamCut());
+ String schemaStr = "event String";
+ final String[] colNames = CsvUtil.getColNames(schemaStr);
+ final TypeInformation[] colTypes = CsvUtil.getColTypes(schemaStr);
+
+
+ Properties props = new Properties();
+ props.setProperty("pravegaControllerUri", pravegaControllerUri);
+ props.setProperty("pravegascope", pravegascope);
+
+
+ PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
+ .withControllerURI(URI.create(pravegaControllerUri))
+ .withDefaultScope(pravegascope)
+ //Enable it if with Nautilus
+ //.withCredentials(credentials)
+ .withHostnameValidation(false);
+
+ /*Pravega pravega = new Pravega();
+ pravega.tableSourceReaderBuilder()
+ .forStream(pravegaStream)*/
+
+ FlinkPravegaReader source = FlinkPravegaReader.builder()
+ .withPravegaConfig(pravegaConfig)
+ .forStream(pravegaStream, pravegaStartStreamCut, pravegaEndStreamCut)
+ .withDeserializationSchema(new PravegaRowDeserializationSchema(Row.class))
+ .build();
+
+ DataStream data = MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamExecutionEnvironment()
+ .addSource(source).name("Pravega StreamReader");
+ return DataStreamConversionUtil.toTable(getMLEnvironmentId(), data, colNames, colTypes);
+ }
+}
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSinkParams.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSinkParams.java
new file mode 100644
index 000000000..229788d96
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSinkParams.java
@@ -0,0 +1,118 @@
+package com.alibaba.alink.params.io;
+
+import com.alibaba.alink.params.io.HasFieldDelimiterDvComma;
+import com.alibaba.alink.params.io.shared_params.HasDataFormat;
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.api.misc.param.WithParams;
+
+public interface PravegaSinkParams extends WithParams, HasDataFormat, HasFieldDelimiterDvComma {
+
+ /**
+ * Parameter of pravega controller uri
+ **/
+ ParamInfo PRAVEGA_CONTROLLER_URI = ParamInfoFactory
+ .createParamInfo("pravega_controller_uri", String.class)
+ .setDescription("pravega controller uri")
+ .setRequired()
+ .setAlias(new String[]{"pravega.controllerUri", "pravega_controller_uri"})
+ .build();
+ /**
+ * Parameter of pravega scope
+ **/
+ ParamInfo PRAVEGA_SCOPE = ParamInfoFactory
+ .createParamInfo("pravega_scope", String.class)
+ .setDescription("pravega scope")
+ .setRequired()
+ .setAlias(new String[]{"pravega.scope", "pravega_scope"})
+ .build();
+ ParamInfo PRAVEGA_STREAM = ParamInfoFactory
+ .createParamInfo("pravega_stream", String.class)
+ .setDescription("pravega stream")
+ .setRequired()
+ .setAlias(new String[]{"pravega.stream", "pravega_stream"})
+ .build();
+ /**
+ * Parameter of pravega writer mode, only be used by the PravegaSinkStreamOp
+ **/
+ ParamInfo PRAVEGA_WRITER_MODE = ParamInfoFactory
+ .createParamInfo("pravega_writer_mode", String.class)
+ .setDescription("pravega writer mode")
+ .setOptional()
+ .setHasDefaultValue("ATLEAST_ONCE")
+ .setAlias(new String[]{"pravega.writerMode", "pravega_writer_mode"})
+ .build();
+ /**
+ * Parameter of pravega standalone indicator
+ **/
+ ParamInfo PRAVEGA_ROUTING_KEY = ParamInfoFactory
+ .createParamInfo("pravega_routing_key", String.class)
+ .setDescription("pravega routing key")
+ .setOptional()
+ .setHasDefaultValue("default")
+ .setAlias(new String[]{"pravega.routingKey", "pravega_routing_key"})
+ .build();
+ /**
+ * Parameter of pravega standalone indicator
+ **/
+ ParamInfo PRAVEGA_STANDALONE = ParamInfoFactory
+ .createParamInfo("PRAVEGA_STANDALONE", Boolean.class)
+ .setDescription("pravega standalone")
+ .setOptional()
+ .setHasDefaultValue(true)
+ .setAlias(new String[]{"pravega.standalone", "pravega_standalone"})
+ .build();
+
+ default String getPravegaControllerUri() {
+ return get(PRAVEGA_CONTROLLER_URI);
+ }
+
+ default T setPravegaControllerUri(String value) {
+ return set(PRAVEGA_CONTROLLER_URI, value);
+ }
+
+ default String getPravegaScope() {
+ return get(PRAVEGA_SCOPE);
+ }
+
+ /**
+ * Parameter of pravega stream
+ **/
+ default T setPravegaScope(String value) {
+ return set(PRAVEGA_SCOPE, value);
+ }
+
+ default String getPravegaStream() {
+ return get(PRAVEGA_STREAM);
+ }
+
+ default T setPravegaStream(String value) {
+ return set(PRAVEGA_STREAM, value);
+ }
+
+ default String getPravegaWriterMode() {
+ return get(PRAVEGA_WRITER_MODE);
+ }
+
+ default T setPravegaPravegaWriterMode(String value) {
+ return set(PRAVEGA_WRITER_MODE, value);
+ }
+
+ default String getPravegaRoutingKey() {
+ return get(PRAVEGA_ROUTING_KEY);
+ }
+
+ default T setPravegaRoutingKey(String value) {
+ return set(PRAVEGA_ROUTING_KEY, value);
+ }
+
+ default Boolean getPravegaStandalone() {
+ return get(PRAVEGA_STANDALONE);
+ }
+
+ default T setPravegaStandalone(Boolean value) {
+ return set(PRAVEGA_STANDALONE, value);
+ }
+
+
+}
diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSourceParams.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSourceParams.java
new file mode 100644
index 000000000..347bb22ff
--- /dev/null
+++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSourceParams.java
@@ -0,0 +1,97 @@
+package com.alibaba.alink.params.io;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+public interface PravegaSourceParams extends WithParams {
+
+ ParamInfo PRAVEGA_CONTROLLER_URI = ParamInfoFactory
+ .createParamInfo("pravega_controller_uri", String.class)
+ .setDescription("pravega controller uri")
+ .setRequired()
+ .setAlias(new String[]{"pravega.controller.uri", "pravega_controller_uri"})
+ .build();
+ ParamInfo PRAVEGA_SCOPE = ParamInfoFactory
+ .createParamInfo("pravega_scope", String.class)
+ .setDescription("pravega scope")
+ .setRequired()
+ .setAlias(new String[]{"pravega.scope", "pravega_scope"})
+ .build();
+ ParamInfo PRAVEGA_STREAM = ParamInfoFactory
+ .createParamInfo("pravega_stream", String.class)
+ .setDescription("pravega stream")
+ .setRequired()
+ .setAlias(new String[]{"pravega.stream", "pravega_stream"})
+ .build();
+ ParamInfo pravega_deserializer = ParamInfoFactory
+ .createParamInfo("pravega_deserializer ", DeserializationSchema.class)
+ .setDescription("pravega deserializer")
+ .setRequired()
+ .setAlias(new String[]{"pravega.deserializer", "pravega_deserializer"})
+ .build();
+ ParamInfo pravega_startStreamCut = ParamInfoFactory
+ .createParamInfo("pravega_startStreamCut ", String.class)
+ .setDescription("pravega startStreamCut")
+ .setHasDefaultValue("UNBOUNDED")
+ .setOptional()
+ .setAlias(new String[]{"pravega.startStreamCut", "pravega_startStreamCut"})
+ .build();
+ ParamInfo pravega_endStreamCut = ParamInfoFactory
+ .createParamInfo("pravega_endtreamCut ", String.class)
+ .setDescription("pravega endStreamCut")
+ .setHasDefaultValue("UNBOUNDED")
+ .setOptional()
+ .setAlias(new String[]{"pravega.endStreamCut", "pravega_endtreamCut"})
+ .build();
+
+ default String getPravegaControllerUri() {
+ return get(PRAVEGA_CONTROLLER_URI);
+ }
+
+ default T setPravegaControllerUri(String value) {
+ return set(PRAVEGA_CONTROLLER_URI, value);
+ }
+
+ default String getPravegaScope() {
+ return get(PRAVEGA_SCOPE);
+ }
+
+ default T setPravegaScope(String value) {
+ return set(PRAVEGA_SCOPE, value);
+ }
+
+ default String getPravegaStream() {
+ return get(PRAVEGA_STREAM);
+ }
+
+ default T setPravegaStream(String value) {
+ return set(PRAVEGA_STREAM, value);
+ }
+
+ default DeserializationSchema getPravegaDeserializer() {
+ return get(pravega_deserializer);
+ }
+
+ default T setPravegaDeserializer(DeserializationSchema value) {
+ return set(pravega_deserializer, value);
+ }
+
+ default String getPravegaStartStreamCut() {
+ return get(pravega_startStreamCut);
+ }
+
+ default T setPravegaStartStreamCut(String value) {
+ return set(pravega_startStreamCut, value);
+ }
+
+ default String getPravegaEndStreamCut() {
+ return get(pravega_endStreamCut);
+ }
+
+ default T setPravegaEndStreamCut(String value) {
+ return set(pravega_endStreamCut, value);
+ }
+
+}
diff --git a/connectors/pom.xml b/connectors/pom.xml
index c903e1997..185d6f3cc 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -5,7 +5,7 @@
alink
com.alibaba.alink
- 1.1-SNAPSHOT
+ 1.1.0
4.0.0
@@ -19,6 +19,7 @@
connector-kafka-0.10
connector-kafka-0.11
connector-kafka
+ connector-pravega
diff --git a/core/pom.xml b/core/pom.xml
index a0f1291c8..352421413 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -5,7 +5,7 @@
alink
com.alibaba.alink
- 1.1-SNAPSHOT
+ 1.1.0
4.0.0
diff --git a/core/src/main/java/com/alibaba/alink/common/MLEnvironment.java b/core/src/main/java/com/alibaba/alink/common/MLEnvironment.java
index de8cda260..c82ad814c 100644
--- a/core/src/main/java/com/alibaba/alink/common/MLEnvironment.java
+++ b/core/src/main/java/com/alibaba/alink/common/MLEnvironment.java
@@ -101,32 +101,17 @@ public MLEnvironment(
* if the ExecutionEnvironment has not been set, it initial the ExecutionEnvironment
* with default Configuration.
*
- * After flink 1.10, The memory configuration is unified(FLIP-49), and the configuration of
- * managed memory and network memory can only be set by 'taskmanager.memory.managed.size'
- * and 'taskmanager.memory.network.min' in local execution environment.
- *
- * @see org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils#adjustForLocalExecution
- *
* @return the batch {@link ExecutionEnvironment}
*/
public ExecutionEnvironment getExecutionEnvironment() {
if (null == env) {
if (ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
- final int managedMemPerCoreInMB = 64;
- final int networkMemPerCoreInMB = 64;
- final int core = Runtime.getRuntime().availableProcessors();
-
Configuration conf = new Configuration();
- conf.setString(
- "taskmanager.memory.managed.size",
- String.format("%dm", managedMemPerCoreInMB * core)
- );
- conf.setString(
- "taskmanager.memory.network.min",
- String.format("%dm", networkMemPerCoreInMB * core)
- );
+ conf.setBoolean("taskmanager.memory.preallocate", true);
+ conf.setBoolean("taskmanager.memory.off-heap", true);
+ conf.setFloat("taskmanager.memory.fraction", 0.3f);
env = ExecutionEnvironment.createLocalEnvironment(conf);
- env.setParallelism(core);
+ env.setParallelism(Runtime.getRuntime().availableProcessors());
} else {
env = ExecutionEnvironment.getExecutionEnvironment();
}
diff --git a/examples/pom.xml b/examples/pom.xml
index d5231a004..0193eae86 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -5,7 +5,7 @@
alink
com.alibaba.alink
- 1.1-SNAPSHOT
+ 1.1.0
4.0.0
@@ -43,6 +43,12 @@
flink-table-planner_${alink.scala.major.version}
${flink.version}
+
+ com.alibaba.alink
+ alink_connectors_pravega_flink-1.9_2.11
+ 1.1.0
+ compile
+
diff --git a/examples/src/main/java/io/pravega/alink/PravegaLinearRegressionExample.java b/examples/src/main/java/io/pravega/alink/PravegaLinearRegressionExample.java
new file mode 100644
index 000000000..e2cdf1032
--- /dev/null
+++ b/examples/src/main/java/io/pravega/alink/PravegaLinearRegressionExample.java
@@ -0,0 +1,165 @@
+package io.pravega.alink;
+
+import com.alibaba.alink.operator.batch.BatchOperator;
+import com.alibaba.alink.operator.batch.dataproc.AppendIdBatchOp;
+import com.alibaba.alink.operator.batch.dataproc.JsonValueBatchOp;
+import com.alibaba.alink.operator.batch.regression.LinearRegTrainBatchOp;
+import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
+import com.alibaba.alink.operator.batch.sink.PravegaSinkBatchOp;
+import com.alibaba.alink.operator.batch.source.PravegaSourceBatchOp;
+import com.alibaba.alink.operator.common.linear.LinearModelType;
+import com.alibaba.alink.operator.stream.dataproc.JsonValueStreamOp;
+import com.alibaba.alink.pipeline.regression.LinearRegression;
+import com.alibaba.alink.operator.stream.StreamOperator;
+import com.alibaba.alink.operator.stream.source.PravegaSourceStreamOp;
+import com.alibaba.alink.pipeline.regression.LinearRegressionModel;
+import io.pravega.alink.common.CommonParams;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+public class PravegaLinearRegressionExample {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PravegaLinearRegressionExample.class);
+ public final String scope;
+ public final String streamName;
+ public final String controllerURI;
+
+
+ public PravegaLinearRegressionExample(String scope, String streamName, String controllerURI) {
+ this.scope = scope;
+ this.streamName = streamName;
+ this.controllerURI = controllerURI;
+ }
+
+ public PravegaLinearRegressionExample PravegaStreamML(String scope, String streamName, String controllerUri) throws Exception {
+ PravegaSourceStreamOp source =
+ new PravegaSourceStreamOp()
+ .setPravegaControllerUri(controllerUri)
+ .setPravegaScope(scope)
+ .setPravegaStream(streamName);
+
+
+ StreamOperator data = source
+ .link (
+ new JsonValueStreamOp()
+ .setSelectedCol("event").setReservedCols(new String[] {})
+ .setOutputCols(
+ new String[] {"DateTime", "PDP2GGERDJKS01000000_2", "PDP2GHDEDJKS01000017_2"}
+ )
+ .setJsonPath(new String[] {"$.DateTime", "$.PDP2GGERDJKS01000000_2", "$.PDP2GHDEDJKS01000017_2"})
+ )
+ .select("CAST(DateTime AS TIMESTAMP) AS DateTime" +
+ "CAST(PDP2GGERDJKS01000000_2 AS FLOAT) AS PDP2GGERDJKS01000000_2" +
+ "CAST(PDP2GHDEDJKS01000017_2 AS FLOAT) AS PDP2GHDEDJKS01000017_2"
+ );
+
+ //LinearRegPredictStreamOp LR = new LinearRegPredictStreamOp(data )
+ System.out.print(data.getSchema());
+ data.print();
+ StreamOperator.execute();
+ return this;
+ }
+
+
+ public PravegaLinearRegressionExample PravegaBatchmML(String scope, String streamName, String controllerUri) throws Exception {
+
+ String labelColName = "PDP2GGERDJKS01000000_2";
+ String[] selectedColNames = new String[] {"append_id"};
+
+
+ //Pravega Source Batch Op
+ System.out.println("Starting to read the dataset from Pravega........");
+ PravegaSourceBatchOp source =
+ new PravegaSourceBatchOp()
+ .setPravegaControllerUri(controllerUri)
+ .setPravegaScope(scope)
+ .setPravegaStream(streamName);
+
+
+ BatchOperator data = source.link (
+ new JsonValueBatchOp()
+ .setSelectedCol("event")
+ .setReservedCols(new String[] {})
+ .setOutputCols(
+ new String[] {"DateTime", "PDP2GGERDJKS01000000_2", "PDP2GHDEDJKS01000017_2"}
+ )
+ .setJsonPath(new String[] {"$.DateTime", "$.PDP2GGERDJKS01000000_2", "$.PDP2GHDEDJKS01000017_2"})
+ )
+ .select("CAST(DateTime AS TIMESTAMP) AS DateTime, " +
+ "CAST(PDP2GGERDJKS01000000_2 AS FLOAT) AS PDP2GGERDJKS01000000_2, " +
+ "CAST(PDP2GHDEDJKS01000017_2 AS FLOAT) AS PDP2GHDEDJKS01000017_2");
+
+ System.out.print("Orignal Table Schema" + data.getSchema());
+ data.print();
+
+
+ BatchOperator pravegadata = data.link(new AppendIdBatchOp());
+
+ System.out.print("pravegadata Table Schema" + pravegadata.getSchema());
+ pravegadata.print();
+
+ //Single Linear Regression Example
+ System.out.println("Starting to run the Linear Regression on the original dataset........");
+ LinearRegression lrop = new LinearRegression()
+ .setLabelCol(labelColName)
+ .setFeatureCols(selectedColNames)
+ .setPredictionCol("pre");
+
+ LinearRegTrainBatchOp LRTrain = new LinearRegTrainBatchOp()
+ .setFeatureCols(selectedColNames)
+ .setLabelCol(labelColName);
+
+
+ List dataSet = lrop.fit(pravegadata).transform(pravegadata).collect();
+
+ String[] a = dataSet.get(0).toString().split(",");
+ String[] b = dataSet.get(dataSet.size()-1).toString().split(",");
+
+ Double diff = ((Double.parseDouble(b[b.length-1]))-(Double.parseDouble(a[a.length-1])))/Double.parseDouble(a[a.length-1]);
+ System.out.println(a[a.length-1]);
+ System.out.println(b[b.length-1]);
+ System.out.println(diff);
+
+ // Linear Regression model sink to Pravega
+ BatchOperator LRModel = pravegadata.link(LRTrain).link (new PravegaSinkBatchOp()
+ .setDataFormat("csv")
+ .setPravegaControllerUri("tcp://localhost:9090")
+ .setPravegaScope("workshop-samples")
+ .setPravegaStream("stream"));
+ BatchOperator.execute();
+
+ PravegaSourceBatchOp newsource =
+ new PravegaSourceBatchOp()
+ .setPravegaControllerUri("tcp://localhost:9090")
+ .setPravegaScope("workshop-samples")
+ .setPravegaStream("stream");
+ System.out.print("Pravega Table Schema" + newsource.getSchema());
+ newsource.print();
+
+
+
+ return this;
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("########## FlinkSQLJOINReader START #############");
+ CommonParams.init(args);
+ String scope = CommonParams.getParam("pravega_scope");
+ String streamName = CommonParams.getParam("stream_name");
+ String controllerURI = CommonParams.getParam("pravega_controller_uri");
+
+ LOG.info("####################### SCOPE ###################### " + scope);
+ LOG.info("####################### streamName ###################### " + streamName);
+ LOG.info("####################### controllerURI ###################### " + controllerURI);
+ PravegaLinearRegressionExample PravegaLinearRegressionExample = new PravegaLinearRegressionExample (scope, streamName, controllerURI);
+ PravegaLinearRegressionExample.PravegaBatchmML(scope, streamName, controllerURI);
+ }
+}
+
diff --git a/examples/src/main/java/io/pravega/alink/common/CommonParams.java b/examples/src/main/java/io/pravega/alink/common/CommonParams.java
new file mode 100644
index 000000000..e2920d0ae
--- /dev/null
+++ b/examples/src/main/java/io/pravega/alink/common/CommonParams.java
@@ -0,0 +1,116 @@
+package io.pravega.alink.common;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import java.net.URI;
+
+// All parameters will come from environment variables. This makes it easy
+// to configure on Docker, Mesos, Kubernetes, etc.
+public class CommonParams {
+ // By default, we will connect to a standalone Pravega running on localhost.
+ public static URI getControllerURI() {
+ return URI.create(getEnvVar("PRAVEGA_CONTROLLER_URI", "tcp://localhost:9090"));
+ }
+
+ public static boolean isPravegaStandalone() {
+ return Boolean.parseBoolean(getEnvVar("pravega_standalone", "true"));
+ }
+
+ public static String getUser() {
+ return getEnvVar("PRAVEGA_STANDALONE_USER", "admin");
+ }
+
+ public static String getPassword() {
+ return getEnvVar("PRAVEGA_STANDALONE_PASSWORD", "1111_aaaa");
+ }
+
+ public static String getScope() {
+ return getEnvVar("PRAVEGA_SCOPE", "workshop-samples");
+ }
+
+ public static String getStreamName() {
+ return getEnvVar("STREAM_NAME", "workshop-stream");
+ }
+
+ public static String getRoutingKeyAttributeName() {
+ return getEnvVar("ROUTING_KEY_ATTRIBUTE_NAME", "test");
+ }
+
+ public static String getMessage() {
+ return getEnvVar("MESSAGE", "This is Nautilus OE team workshop samples.");
+ }
+
+ private static String getEnvVar(String name, String defaultValue) {
+ String value = System.getenv(name);
+ if (value == null || value.isEmpty()) {
+ return defaultValue;
+ }
+ return value;
+ }
+
+ public static URI getGatewayURI() {
+ return URI.create(getEnvVar("GATEWAY_URI", "http://0.0.0.0:3000/"));
+ }
+
+ public static int getTargetRateEventsPerSec() {
+ return Integer.parseInt(getEnvVar("PRAVEGA_TARGET_RATE_EVENTS_PER_SEC", "100"));
+ }
+
+ public static int getScaleFactor() {
+ return Integer.parseInt(getEnvVar("PRAVEGA_SCALE_FACTOR", "2"));
+ }
+
+ public static int getMinNumSegments() {
+ return Integer.parseInt(getEnvVar("PRAVEGA_MIN_NUM_SEGMENTS", "1"));
+ }
+
+ public static int getListenPort() {
+ return Integer.parseInt(getEnvVar("LISTEN_PORT", "54672"));
+ }
+
+ public static ParameterTool params = null;
+
+ public static void init(String[] args) {
+ params = ParameterTool.fromArgs(args);
+ }
+
+ public static String getParam(String key) {
+ if (params != null && params.has(key)) {
+ return params.get(key);
+ } else {
+ return getDefaultParam(key);
+ }
+ }
+
+ private static String getDefaultParam(String key) {
+ String keyValue = null;
+ if (key != null) {
+ switch (key) {
+ case "pravega_scope":
+ keyValue = "workshop-samples";
+ break;
+ case "stream_name":
+ keyValue = "workshop-stream";
+ break;
+ case "pravega_controller_uri":
+ keyValue = "tcp://localhost:9090";
+ break;
+ case "routing_key_attribute_name":
+ keyValue = "100";
+ break;
+ case "pravega_standalone":
+ keyValue = "true";
+ break;
+ case "data_file":
+ keyValue = "initial_data_result_nozero.csv";
+ break;
+ case "message":
+ keyValue = "To demonstrate Nautilus streams sending a string message";
+ break;
+ default:
+ keyValue = null;
+ }
+ }
+ return keyValue;
+ }
+}
\ No newline at end of file
diff --git a/examples/src/main/java/io/pravega/alink/test_readFromCsv.java b/examples/src/main/java/io/pravega/alink/test_readFromCsv.java
new file mode 100644
index 000000000..72888eb78
--- /dev/null
+++ b/examples/src/main/java/io/pravega/alink/test_readFromCsv.java
@@ -0,0 +1,34 @@
+package io.pravega.alink;
+
+import com.alibaba.alink.operator.batch.BatchOperator;
+import com.alibaba.alink.operator.batch.dataproc.AppendIdBatchOp;
+import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
+import com.alibaba.alink.pipeline.regression.LinearRegression;
+
+
+public class test_readFromCsv {
+ public static void main(String[] args) throws Exception {
+
+ String csvSchema = "time Timestamp ,PDP2GGERDJKS01000000_2 Double, PDP2GHDEDJKS01000017_2 Double";
+ CsvSourceBatchOp csvSource = new CsvSourceBatchOp().setFilePath("C:\\Sandbox\\initial_data_result_nozero_alink_test.csv").setIgnoreFirstLine(true).setSchemaStr(csvSchema);
+
+ System.out.println("Orignal Table Schema........");
+ System.out.print( csvSource.getSchema());
+ csvSource.print();
+
+
+ BatchOperator csvdata = csvSource.link(new AppendIdBatchOp());
+ System.out.println("Table Schema after added AppendId........");
+ csvdata.print();
+ System.out.print( csvSource.getSchema());
+
+ LinearRegression lrop = new LinearRegression()
+ .setLabelCol("PDP2GGERDJKS01000000_2")
+ .setFeatureCols(new String[]{"append_id"})
+ .setPredictionCol("pre");
+
+ lrop.fit(csvdata).transform(csvdata).print();
+ }
+
+}
+
diff --git a/examples/src/main/java/io/pravega/alink/test_readWriteWithPravega.java b/examples/src/main/java/io/pravega/alink/test_readWriteWithPravega.java
new file mode 100644
index 000000000..2fb04fb66
--- /dev/null
+++ b/examples/src/main/java/io/pravega/alink/test_readWriteWithPravega.java
@@ -0,0 +1,49 @@
+package io.pravega.alink;
+
+
+import com.alibaba.alink.operator.batch.BatchOperator;
+import com.alibaba.alink.operator.batch.dataproc.JsonValueBatchOp;
+import com.alibaba.alink.operator.batch.sink.PravegaSinkBatchOp;
+import com.alibaba.alink.operator.batch.source.PravegaSourceBatchOp;
+
+
+public class test_readWriteWithPravega {
+ public static void main(String[] args) throws Exception {
+
+ System.out.println("Starting to read the dataset from Pravega........");
+
+ PravegaSourceBatchOp source =
+ new PravegaSourceBatchOp()
+ .setPravegaControllerUri("tcp://localhost:9090")
+ .setPravegaScope("workshop-samples")
+ .setPravegaStream("workshop-stream");
+ BatchOperator op = source.link (
+ new JsonValueBatchOp()
+ .setSelectedCol("event")
+ .setReservedCols(new String[] {})
+ .setOutputCols(
+ new String[] {"DateTime", "PDP2GGERDJKS01000000_2", "PDP2GHDEDJKS01000017_2"}
+ )
+ .setJsonPath(new String[] {"$.DateTime", "$.PDP2GGERDJKS01000000_2", "$.PDP2GHDEDJKS01000017_2"})
+ );
+ System.out.print("Pravega Table Schema" + op.getSchema());
+
+ PravegaSinkBatchOp sink = new PravegaSinkBatchOp()
+ .setDataFormat("csv")
+ .setPravegaControllerUri("tcp://localhost:9090")
+ .setPravegaScope("workshop-samples")
+ .setPravegaStream("stream");
+ sink.linkFrom(op);
+ BatchOperator.execute();
+
+ PravegaSourceBatchOp newsource =
+ new PravegaSourceBatchOp()
+ .setPravegaControllerUri("tcp://localhost:9090")
+ .setPravegaScope("workshop-samples")
+ .setPravegaStream("stream");
+ System.out.print("Pravega Table Schema" + newsource.getSchema());
+ newsource.print();
+
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b916ebea2..7f053e90f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.alibaba.alink
alink
pom
- 1.1-SNAPSHOT
+ 1.1.0
alink
Alink is the Machine Learning algorithm platform based on Flink, developed by the PAI team of Alibaba computing platform.
@@ -91,14 +91,15 @@
${java.version}
${java.version}
UTF-8
- 1.10
- 1.10.0
+ 1.9
+ 1.9.0
2.11
2.11.11
4.5.3
2.8.2
3.3.2
0.11.2
+ 0.6.2