diff --git a/connectors/connector-kafka-0.10/pom.xml b/connectors/connector-kafka-0.10/pom.xml index 8f614f1e5..7138c9a64 100644 --- a/connectors/connector-kafka-0.10/pom.xml +++ b/connectors/connector-kafka-0.10/pom.xml @@ -5,7 +5,7 @@ alink_connectors com.alibaba.alink - 1.1-SNAPSHOT + 1.1.0 4.0.0 diff --git a/connectors/connector-kafka-0.11/pom.xml b/connectors/connector-kafka-0.11/pom.xml index faf6bebc8..4003d7ec4 100644 --- a/connectors/connector-kafka-0.11/pom.xml +++ b/connectors/connector-kafka-0.11/pom.xml @@ -5,7 +5,7 @@ alink_connectors com.alibaba.alink - 1.1-SNAPSHOT + 1.1.0 4.0.0 diff --git a/connectors/connector-kafka-base/pom.xml b/connectors/connector-kafka-base/pom.xml index 3a5d5b654..b7eabf62b 100644 --- a/connectors/connector-kafka-base/pom.xml +++ b/connectors/connector-kafka-base/pom.xml @@ -5,7 +5,7 @@ alink_connectors com.alibaba.alink - 1.1-SNAPSHOT + 1.1.0 4.0.0 diff --git a/connectors/connector-kafka/pom.xml b/connectors/connector-kafka/pom.xml index 8a91604e6..ec91748eb 100644 --- a/connectors/connector-kafka/pom.xml +++ b/connectors/connector-kafka/pom.xml @@ -5,7 +5,7 @@ alink_connectors com.alibaba.alink - 1.1-SNAPSHOT + 1.1.0 4.0.0 diff --git a/connectors/connector-pravega/pom.xml b/connectors/connector-pravega/pom.xml new file mode 100644 index 000000000..d4322e78f --- /dev/null +++ b/connectors/connector-pravega/pom.xml @@ -0,0 +1,62 @@ + + + + alink_connectors + com.alibaba.alink + 1.1.0 + + 4.0.0 + + + alink_connectors_pravega_flink-${alink.flink.major.version}_${alink.scala.major.version} + alink-connector-pravega + + jar + + + + com.alibaba.alink + alink_core_flink-${alink.flink.major.version}_${alink.scala.major.version} + ${project.version} + provided + + + org.apache.flink + flink-streaming-java_${alink.scala.major.version} + ${flink.version} + provided + + + org.apache.flink + flink-streaming-scala_${alink.scala.major.version} + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge_${alink.scala.major.version} + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + org.apache.flink + flink-table-planner_${alink.scala.major.version} + ${flink.version} + provided + + + + io.pravega + pravega-connectors-flink-${alink.flink.major.version}_2.12 + ${pravega.version} + + + \ No newline at end of file diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/sink/PravegaSinkBatchOp.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/sink/PravegaSinkBatchOp.java new file mode 100644 index 000000000..8ecc28c1d --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/sink/PravegaSinkBatchOp.java @@ -0,0 +1,103 @@ +package com.alibaba.alink.operator.batch.sink; + + +import com.alibaba.alink.common.MLEnvironmentFactory; +import com.alibaba.alink.common.io.annotations.AnnotationUtils; +import com.alibaba.alink.common.io.annotations.IOType; +import com.alibaba.alink.common.io.annotations.IoOpAnnotation; +import com.alibaba.alink.operator.batch.BatchOperator; +import com.alibaba.alink.operator.common.io.csv.CsvUtil; +import com.alibaba.alink.operator.common.io.pravega.PravegaCsvRowSerializationSchema; +import com.alibaba.alink.operator.common.io.pravega.PravegaJsonRowSerializationSchema; +import com.alibaba.alink.params.io.PravegaSinkParams; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamConfiguration; +import io.pravega.connectors.flink.FlinkPravegaOutputFormat; +import io.pravega.connectors.flink.PravegaConfig; +import io.pravega.connectors.flink.PravegaEventRouter; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.types.Row; + +import java.net.URI; +import java.util.Collection; + + +@IoOpAnnotation(name = "pravega", ioType = IOType.SinkBatch) +public class PravegaSinkBatchOp extends BaseSinkBatchOp + implements PravegaSinkParams { + + + public PravegaSinkBatchOp() { + this(new Params()); + } + + + public PravegaSinkBatchOp(Params params) { + super(AnnotationUtils.annotatedName(PravegaSinkBatchOp.class), params); + } + + @Override + protected PravegaSinkBatchOp sinkFrom(BatchOperator in) { + final String pravegaControllerUri = getPravegaControllerUri(); + final String pravegaScope = getPravegaScope(); + final String pravegaStream = getPravegaStream(); + final Boolean PravegaStandalone = getPravegaStandalone(); + final String dataFormat = getDataFormat(); + final String[] colNames = in.getColNames(); + final String fieldDelimiter = CsvUtil.unEscape(getFieldDelimiter()); + + System.out.println(in.collect().toString()); + + PravegaEventRouter router = new PravegaEventRouter() { + @Override + public String getRoutingKey(Row eventType) { + final String pravegaRoutingKey = getPravegaRoutingKey(); + return pravegaRoutingKey; + } + }; + + PravegaConfig pravegaConfig = PravegaConfig.fromDefaults() + .withControllerURI(URI.create(pravegaControllerUri)) + .withDefaultScope(pravegaScope) + //Enable it if with Nautilus + //.withCredentials(credentials) + .withHostnameValidation(false); + + + //Create Pravega Scope and Stream + Stream stream = pravegaConfig.resolve(pravegaStream); + try (StreamManager streamManager = StreamManager.create(pravegaConfig.getClientConfig())) { + // create the requested scope (if standalone) + if (PravegaStandalone) { + streamManager.createScope(pravegaScope); + } + // create the requested stream based on the given stream configuration + streamManager.createStream(stream.getScope(), stream.getStreamName(), StreamConfiguration.builder().build()); + } + + SerializationSchema serializationSchema = null; + //serializationSchema = new JsonRowSerializationSchema(colNames); + if (dataFormat.equalsIgnoreCase("csv")) { + serializationSchema = new PravegaCsvRowSerializationSchema(fieldDelimiter); + } else if (dataFormat.equalsIgnoreCase("json")) { + serializationSchema = new PravegaJsonRowSerializationSchema(colNames); + } + + + FlinkPravegaOutputFormat outputFormat = FlinkPravegaOutputFormat.builder() + .forStream(pravegaStream) + .withPravegaConfig(pravegaConfig) + .withSerializationSchema(serializationSchema) + .withEventRouter(router) + .build(); + + Collection inputData = in.collect(); + ExecutionEnvironment env = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment(); + + env.fromCollection(inputData).output(outputFormat).name("pravega_" + pravegaScope + "/" + pravegaStream); + return this; + } +} diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/source/PravegaSourceBatchOp.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/source/PravegaSourceBatchOp.java new file mode 100644 index 000000000..e1aa16d71 --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/batch/source/PravegaSourceBatchOp.java @@ -0,0 +1,77 @@ +package com.alibaba.alink.operator.batch.source; + +import com.alibaba.alink.common.MLEnvironmentFactory; +import com.alibaba.alink.common.io.annotations.AnnotationUtils; +import com.alibaba.alink.common.io.annotations.IOType; +import com.alibaba.alink.common.io.annotations.IoOpAnnotation; +import com.alibaba.alink.common.utils.DataSetConversionUtil; +import com.alibaba.alink.operator.common.io.csv.CsvUtil; +import com.alibaba.alink.operator.common.io.pravega.PravegaRowDeserializationSchema; +import com.alibaba.alink.params.io.PravegaSourceParams; +import io.pravega.client.ClientConfig; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.ReaderGroup; +import io.pravega.client.stream.StreamCut; +import io.pravega.connectors.flink.FlinkPravegaInputFormat; +import io.pravega.connectors.flink.PravegaConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.table.api.Table; +import org.apache.flink.types.Row; + +import java.net.URI; + +@IoOpAnnotation(name = "pravega", ioType = IOType.SourceBatch) +public final class PravegaSourceBatchOp extends BaseSourceBatchOp + implements PravegaSourceParams { + + public PravegaSourceBatchOp() { + this(new Params()); + } + + + public PravegaSourceBatchOp(Params params) { + super(AnnotationUtils.annotatedName(PravegaSourceBatchOp.class), params); + } + + @Override + protected Table initializeDataSource() { + String pravegaControllerUri = getPravegaControllerUri(); + String pravegascope = getPravegaScope(); + String pravegaStream = getPravegaStream(); + StreamCut pravegaStartStreamCut = StreamCut.from(getPravegaStartStreamCut()); + StreamCut pravegaEndStreamCut = StreamCut.from(getPravegaEndStreamCut()); + String schemaStr = "event String"; + //DeserializationSchema deserializationSchema = getPravegaDeserializer(); + final String[] colNames = CsvUtil.getColNames(schemaStr); + final TypeInformation[] colTypes = CsvUtil.getColTypes(schemaStr); + + + //Properties props = new Properties(); + //props.setProperty("pravegaControllerUri", pravegaControllerUri); + //props.setProperty("pravegascope", pravegascope); + + + PravegaConfig pravegaConfig = PravegaConfig.fromDefaults() + .withControllerURI(URI.create(pravegaControllerUri)) + .withDefaultScope(pravegascope) + //Enable it if with Nautilus + //.withCredentials(credentials) + .withHostnameValidation(false); + + + FlinkPravegaInputFormat source = FlinkPravegaInputFormat.builder() + .withPravegaConfig(pravegaConfig) + .forStream(pravegaStream, pravegaStartStreamCut, pravegaEndStreamCut) + .withDeserializationSchema(new PravegaRowDeserializationSchema(Row.class)) + .build(); + + ExecutionEnvironment execEnv = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment(); + + DataSource data = execEnv.createInput(source, TypeInformation.of(Row.class)).name("Pravega BatchReader"); + + return DataSetConversionUtil.toTable(getMLEnvironmentId(), data, colNames, colTypes); + } +} diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaCsvRowSerializationSchema.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaCsvRowSerializationSchema.java new file mode 100644 index 000000000..22b4b9422 --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaCsvRowSerializationSchema.java @@ -0,0 +1,36 @@ +package com.alibaba.alink.operator.common.io.pravega; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.types.Row; + + +public class PravegaCsvRowSerializationSchema implements SerializationSchema { + + private final String fieldDelimiter; + + public PravegaCsvRowSerializationSchema(String fieldDelimiter) { + this.fieldDelimiter = fieldDelimiter; + } + + @Override + public byte[] serialize(Row row) { + StringBuilder sbd = new StringBuilder(); + int n = row.getArity(); + for (int i = 0; i < n; i++) { + Object obj = row.getField(i); + if (obj != null) { + sbd.append(obj); + } + if (i != n - 1) { + sbd.append(this.fieldDelimiter); + } + } + String str = sbd.toString(); + + try { + return str.getBytes("UTF-8"); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize row", e); + } + } +} \ No newline at end of file diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaJsonRowSerializationSchema.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaJsonRowSerializationSchema.java new file mode 100644 index 000000000..838dcff4c --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaJsonRowSerializationSchema.java @@ -0,0 +1,34 @@ +package com.alibaba.alink.operator.common.io.pravega; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.types.Row; + +import java.util.HashMap; + +import static com.alibaba.alink.common.utils.JsonConverter.gson; + +public class PravegaJsonRowSerializationSchema implements SerializationSchema { + + String[] colNames; + + public PravegaJsonRowSerializationSchema(String[] colNames) { + this.colNames = colNames; + } + + @Override + public byte[] serialize(Row row) { + HashMap map = new HashMap<>(); + for (int i = 0; i < colNames.length; i++) { + Object obj = row.getField(i); + if (obj != null) { + map.put(colNames[i], obj); + } + } + String str = gson.toJson(map); + try { + return str.getBytes("UTF-8"); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize row", e); + } + } +} \ No newline at end of file diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaRowDeserializationSchema.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaRowDeserializationSchema.java new file mode 100644 index 000000000..355fbee76 --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/common/io/pravega/PravegaRowDeserializationSchema.java @@ -0,0 +1,34 @@ +package com.alibaba.alink.operator.common.io.pravega; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.nio.charset.Charset; + +public class PravegaRowDeserializationSchema implements DeserializationSchema { + + private Class valueType; + + public PravegaRowDeserializationSchema(Class valueType) { + this.valueType = valueType; + } + + @Override + public T deserialize(byte[] event) throws IOException { + Row row = new Row(1); + row.setField(0, event != null ? new String(event, Charset.forName("UTF-8")) : null); + return (T) row; + } + + @Override + public boolean isEndOfStream(T nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(valueType); + } +} \ No newline at end of file diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/sink/PravegaSinkStreamOp.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/sink/PravegaSinkStreamOp.java new file mode 100644 index 000000000..36f504719 --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/sink/PravegaSinkStreamOp.java @@ -0,0 +1,116 @@ +package com.alibaba.alink.operator.stream.sink; + + +import com.alibaba.alink.common.io.annotations.AnnotationUtils; +import com.alibaba.alink.common.io.annotations.IOType; +import com.alibaba.alink.common.io.annotations.IoOpAnnotation; +import com.alibaba.alink.operator.common.io.csv.CsvUtil; +import com.alibaba.alink.operator.common.io.pravega.PravegaCsvRowSerializationSchema; +import com.alibaba.alink.operator.common.io.pravega.PravegaJsonRowSerializationSchema; +import com.alibaba.alink.operator.stream.StreamOperator; +import com.alibaba.alink.params.io.PravegaSinkParams; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamConfiguration; +import io.pravega.connectors.flink.*; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.types.Row; + +import java.net.URI; + + +/** + * Data Sink for pravega. + */ +@IoOpAnnotation(name = "pravega", hasTimestamp = true, ioType = IOType.SinkStream) +public class PravegaSinkStreamOp extends BaseSinkStreamOp + implements PravegaSinkParams { + + public PravegaSinkStreamOp() { + this(new Params()); + } + + + public PravegaSinkStreamOp(Params params) { + super(AnnotationUtils.annotatedName(PravegaSinkStreamOp.class), params); + } + + @Override + protected PravegaSinkStreamOp sinkFrom(StreamOperator in) { + final String pravegaControllerUri = getPravegaControllerUri(); + final String pravegaScope = getPravegaScope(); + final String pravegaStream = getPravegaStream(); + final Boolean PravegaStandalone = getPravegaStandalone(); + final String dataFormat = getDataFormat(); + final String[] colNames = in.getColNames(); + final String fieldDelimiter = CsvUtil.unEscape(getFieldDelimiter()); + + System.out.println(in.getDataStream()); + + //Fetch the Pravega writer mode + PravegaWriterMode writerMode = null; + final String pravegawritemode = getPravegaWriterMode(); + switch (pravegawritemode.toUpperCase()) { + case ("ATLEAST_ONCE"): + writerMode = PravegaWriterMode.ATLEAST_ONCE; + break; + case ("EXACTLY_ONCE"): + writerMode = PravegaWriterMode.EXACTLY_ONCE; + break; + case ("BEST_EFFORT"): + writerMode = PravegaWriterMode.BEST_EFFORT; + break; + } + + //Generate the routing key + PravegaEventRouter router = new PravegaEventRouter() { + @Override + public String getRoutingKey(Row eventType) { + final String pravegaRoutingKey = getPravegaRoutingKey(); + return pravegaRoutingKey; + } + }; + + PravegaConfig pravegaConfig = PravegaConfig.fromDefaults() + .withControllerURI(URI.create(pravegaControllerUri)) + .withDefaultScope(pravegaScope) + //Enable it if with Nautilus + //.withCredentials(credentials) + .withHostnameValidation(false); + + + //Create Pravega Scope and Stream in case it does not exist. + Stream stream = pravegaConfig.resolve(pravegaStream); + try (StreamManager streamManager = StreamManager.create(pravegaConfig.getClientConfig())) { + // create the requested scope (if standalone) + if (PravegaStandalone) { + streamManager.createScope(pravegaScope); + } + // create the requested stream based on the given stream configuration + streamManager.createStream(stream.getScope(), stream.getStreamName(), StreamConfiguration.builder().build()); + } + + SerializationSchema serializationSchema = null; + //serializationSchema = new JsonRowSerializationSchema(colNames); + if (dataFormat.equalsIgnoreCase("csv")) { + serializationSchema = new PravegaCsvRowSerializationSchema(fieldDelimiter); + } else if (dataFormat.equalsIgnoreCase("json")) { + serializationSchema = new PravegaJsonRowSerializationSchema(colNames); + } + + FlinkPravegaWriter pravegaSink = FlinkPravegaWriter.builder() + .forStream(pravegaStream) + .withPravegaConfig(pravegaConfig) + .withSerializationSchema(serializationSchema) + .withEventRouter(router) + .withWriterMode(writerMode) + .build(); + + DataStream inputData = in.getDataStream(); + inputData.addSink(pravegaSink).name("pravega_" + pravegaScope + "/" + pravegaStream); + + return this; + } +} diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/source/PravegaSourceStreamOp.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/source/PravegaSourceStreamOp.java new file mode 100644 index 000000000..3c07b93e3 --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/operator/stream/source/PravegaSourceStreamOp.java @@ -0,0 +1,87 @@ +package com.alibaba.alink.operator.stream.source; + +import com.alibaba.alink.common.MLEnvironmentFactory; +import com.alibaba.alink.common.io.annotations.AnnotationUtils; +import com.alibaba.alink.common.io.annotations.IOType; +import com.alibaba.alink.common.io.annotations.IoOpAnnotation; +import com.alibaba.alink.common.utils.DataStreamConversionUtil; +import com.alibaba.alink.operator.common.io.csv.CsvUtil; +import com.alibaba.alink.operator.common.io.pravega.PravegaRowDeserializationSchema; +import com.alibaba.alink.params.io.PravegaSourceParams; +import io.pravega.client.ClientConfig; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.StreamCut; +import io.pravega.connectors.flink.FlinkPravegaReader; +import io.pravega.connectors.flink.PravegaConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.types.Row; + +import java.net.URI; +import java.util.Properties; + +/** + * Data source for pravega. + */ +@IoOpAnnotation(name = "pravega", hasTimestamp = true, ioType = IOType.SourceStream) +public class PravegaSourceStreamOp extends BaseSourceStreamOp + implements PravegaSourceParams { + + public PravegaSourceStreamOp() { + this(new Params()); + } + + + public PravegaSourceStreamOp(Params params) { + super(AnnotationUtils.annotatedName(PravegaSourceStreamOp.class), params); + } + + @Override + protected Table initializeDataSource() { + String pravegaControllerUri = getPravegaControllerUri(); + String pravegascope = getPravegaScope(); + String pravegaStream = getPravegaStream(); + StreamCut pravegaStartStreamCut = null; + if (getPravegaStartStreamCut().equals("UNBOUNDED")) + { + ClientConfig clientConfig= ClientConfig.builder().controllerURI(URI.create(pravegaControllerUri)).build(); + StreamManager streamManager = StreamManager.create(clientConfig); + pravegaStartStreamCut = streamManager.getStreamInfo(pravegascope,pravegaStream).getTailStreamCut(); + } else { + pravegaStartStreamCut = StreamCut.from(getPravegaStartStreamCut()); + } + StreamCut pravegaEndStreamCut = StreamCut.from(getPravegaEndStreamCut()); + String schemaStr = "event String"; + final String[] colNames = CsvUtil.getColNames(schemaStr); + final TypeInformation[] colTypes = CsvUtil.getColTypes(schemaStr); + + + Properties props = new Properties(); + props.setProperty("pravegaControllerUri", pravegaControllerUri); + props.setProperty("pravegascope", pravegascope); + + + PravegaConfig pravegaConfig = PravegaConfig.fromDefaults() + .withControllerURI(URI.create(pravegaControllerUri)) + .withDefaultScope(pravegascope) + //Enable it if with Nautilus + //.withCredentials(credentials) + .withHostnameValidation(false); + + /*Pravega pravega = new Pravega(); + pravega.tableSourceReaderBuilder() + .forStream(pravegaStream)*/ + + FlinkPravegaReader source = FlinkPravegaReader.builder() + .withPravegaConfig(pravegaConfig) + .forStream(pravegaStream, pravegaStartStreamCut, pravegaEndStreamCut) + .withDeserializationSchema(new PravegaRowDeserializationSchema(Row.class)) + .build(); + + DataStream data = MLEnvironmentFactory.get(getMLEnvironmentId()).getStreamExecutionEnvironment() + .addSource(source).name("Pravega StreamReader"); + return DataStreamConversionUtil.toTable(getMLEnvironmentId(), data, colNames, colTypes); + } +} diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSinkParams.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSinkParams.java new file mode 100644 index 000000000..229788d96 --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSinkParams.java @@ -0,0 +1,118 @@ +package com.alibaba.alink.params.io; + +import com.alibaba.alink.params.io.HasFieldDelimiterDvComma; +import com.alibaba.alink.params.io.shared_params.HasDataFormat; +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.ParamInfoFactory; +import org.apache.flink.ml.api.misc.param.WithParams; + +public interface PravegaSinkParams extends WithParams, HasDataFormat, HasFieldDelimiterDvComma { + + /** + * Parameter of pravega controller uri + **/ + ParamInfo PRAVEGA_CONTROLLER_URI = ParamInfoFactory + .createParamInfo("pravega_controller_uri", String.class) + .setDescription("pravega controller uri") + .setRequired() + .setAlias(new String[]{"pravega.controllerUri", "pravega_controller_uri"}) + .build(); + /** + * Parameter of pravega scope + **/ + ParamInfo PRAVEGA_SCOPE = ParamInfoFactory + .createParamInfo("pravega_scope", String.class) + .setDescription("pravega scope") + .setRequired() + .setAlias(new String[]{"pravega.scope", "pravega_scope"}) + .build(); + ParamInfo PRAVEGA_STREAM = ParamInfoFactory + .createParamInfo("pravega_stream", String.class) + .setDescription("pravega stream") + .setRequired() + .setAlias(new String[]{"pravega.stream", "pravega_stream"}) + .build(); + /** + * Parameter of pravega writer mode, only be used by the PravegaSinkStreamOp + **/ + ParamInfo PRAVEGA_WRITER_MODE = ParamInfoFactory + .createParamInfo("pravega_writer_mode", String.class) + .setDescription("pravega writer mode") + .setOptional() + .setHasDefaultValue("ATLEAST_ONCE") + .setAlias(new String[]{"pravega.writerMode", "pravega_writer_mode"}) + .build(); + /** + * Parameter of pravega standalone indicator + **/ + ParamInfo PRAVEGA_ROUTING_KEY = ParamInfoFactory + .createParamInfo("pravega_routing_key", String.class) + .setDescription("pravega routing key") + .setOptional() + .setHasDefaultValue("default") + .setAlias(new String[]{"pravega.routingKey", "pravega_routing_key"}) + .build(); + /** + * Parameter of pravega standalone indicator + **/ + ParamInfo PRAVEGA_STANDALONE = ParamInfoFactory + .createParamInfo("PRAVEGA_STANDALONE", Boolean.class) + .setDescription("pravega standalone") + .setOptional() + .setHasDefaultValue(true) + .setAlias(new String[]{"pravega.standalone", "pravega_standalone"}) + .build(); + + default String getPravegaControllerUri() { + return get(PRAVEGA_CONTROLLER_URI); + } + + default T setPravegaControllerUri(String value) { + return set(PRAVEGA_CONTROLLER_URI, value); + } + + default String getPravegaScope() { + return get(PRAVEGA_SCOPE); + } + + /** + * Parameter of pravega stream + **/ + default T setPravegaScope(String value) { + return set(PRAVEGA_SCOPE, value); + } + + default String getPravegaStream() { + return get(PRAVEGA_STREAM); + } + + default T setPravegaStream(String value) { + return set(PRAVEGA_STREAM, value); + } + + default String getPravegaWriterMode() { + return get(PRAVEGA_WRITER_MODE); + } + + default T setPravegaPravegaWriterMode(String value) { + return set(PRAVEGA_WRITER_MODE, value); + } + + default String getPravegaRoutingKey() { + return get(PRAVEGA_ROUTING_KEY); + } + + default T setPravegaRoutingKey(String value) { + return set(PRAVEGA_ROUTING_KEY, value); + } + + default Boolean getPravegaStandalone() { + return get(PRAVEGA_STANDALONE); + } + + default T setPravegaStandalone(Boolean value) { + return set(PRAVEGA_STANDALONE, value); + } + + +} diff --git a/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSourceParams.java b/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSourceParams.java new file mode 100644 index 000000000..347bb22ff --- /dev/null +++ b/connectors/connector-pravega/src/main/java/com/alibaba/alink/params/io/PravegaSourceParams.java @@ -0,0 +1,97 @@ +package com.alibaba.alink.params.io; + +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.ParamInfoFactory; +import org.apache.flink.ml.api.misc.param.WithParams; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +public interface PravegaSourceParams extends WithParams { + + ParamInfo PRAVEGA_CONTROLLER_URI = ParamInfoFactory + .createParamInfo("pravega_controller_uri", String.class) + .setDescription("pravega controller uri") + .setRequired() + .setAlias(new String[]{"pravega.controller.uri", "pravega_controller_uri"}) + .build(); + ParamInfo PRAVEGA_SCOPE = ParamInfoFactory + .createParamInfo("pravega_scope", String.class) + .setDescription("pravega scope") + .setRequired() + .setAlias(new String[]{"pravega.scope", "pravega_scope"}) + .build(); + ParamInfo PRAVEGA_STREAM = ParamInfoFactory + .createParamInfo("pravega_stream", String.class) + .setDescription("pravega stream") + .setRequired() + .setAlias(new String[]{"pravega.stream", "pravega_stream"}) + .build(); + ParamInfo pravega_deserializer = ParamInfoFactory + .createParamInfo("pravega_deserializer ", DeserializationSchema.class) + .setDescription("pravega deserializer") + .setRequired() + .setAlias(new String[]{"pravega.deserializer", "pravega_deserializer"}) + .build(); + ParamInfo pravega_startStreamCut = ParamInfoFactory + .createParamInfo("pravega_startStreamCut ", String.class) + .setDescription("pravega startStreamCut") + .setHasDefaultValue("UNBOUNDED") + .setOptional() + .setAlias(new String[]{"pravega.startStreamCut", "pravega_startStreamCut"}) + .build(); + ParamInfo pravega_endStreamCut = ParamInfoFactory + .createParamInfo("pravega_endtreamCut ", String.class) + .setDescription("pravega endStreamCut") + .setHasDefaultValue("UNBOUNDED") + .setOptional() + .setAlias(new String[]{"pravega.endStreamCut", "pravega_endtreamCut"}) + .build(); + + default String getPravegaControllerUri() { + return get(PRAVEGA_CONTROLLER_URI); + } + + default T setPravegaControllerUri(String value) { + return set(PRAVEGA_CONTROLLER_URI, value); + } + + default String getPravegaScope() { + return get(PRAVEGA_SCOPE); + } + + default T setPravegaScope(String value) { + return set(PRAVEGA_SCOPE, value); + } + + default String getPravegaStream() { + return get(PRAVEGA_STREAM); + } + + default T setPravegaStream(String value) { + return set(PRAVEGA_STREAM, value); + } + + default DeserializationSchema getPravegaDeserializer() { + return get(pravega_deserializer); + } + + default T setPravegaDeserializer(DeserializationSchema value) { + return set(pravega_deserializer, value); + } + + default String getPravegaStartStreamCut() { + return get(pravega_startStreamCut); + } + + default T setPravegaStartStreamCut(String value) { + return set(pravega_startStreamCut, value); + } + + default String getPravegaEndStreamCut() { + return get(pravega_endStreamCut); + } + + default T setPravegaEndStreamCut(String value) { + return set(pravega_endStreamCut, value); + } + +} diff --git a/connectors/pom.xml b/connectors/pom.xml index c903e1997..185d6f3cc 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -5,7 +5,7 @@ alink com.alibaba.alink - 1.1-SNAPSHOT + 1.1.0 4.0.0 @@ -19,6 +19,7 @@ connector-kafka-0.10 connector-kafka-0.11 connector-kafka + connector-pravega diff --git a/core/pom.xml b/core/pom.xml index a0f1291c8..352421413 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -5,7 +5,7 @@ alink com.alibaba.alink - 1.1-SNAPSHOT + 1.1.0 4.0.0 diff --git a/core/src/main/java/com/alibaba/alink/common/MLEnvironment.java b/core/src/main/java/com/alibaba/alink/common/MLEnvironment.java index de8cda260..c82ad814c 100644 --- a/core/src/main/java/com/alibaba/alink/common/MLEnvironment.java +++ b/core/src/main/java/com/alibaba/alink/common/MLEnvironment.java @@ -101,32 +101,17 @@ public MLEnvironment( * if the ExecutionEnvironment has not been set, it initial the ExecutionEnvironment * with default Configuration. * - * After flink 1.10, The memory configuration is unified(FLIP-49), and the configuration of - * managed memory and network memory can only be set by 'taskmanager.memory.managed.size' - * and 'taskmanager.memory.network.min' in local execution environment. - * - * @see org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils#adjustForLocalExecution - * * @return the batch {@link ExecutionEnvironment} */ public ExecutionEnvironment getExecutionEnvironment() { if (null == env) { if (ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { - final int managedMemPerCoreInMB = 64; - final int networkMemPerCoreInMB = 64; - final int core = Runtime.getRuntime().availableProcessors(); - Configuration conf = new Configuration(); - conf.setString( - "taskmanager.memory.managed.size", - String.format("%dm", managedMemPerCoreInMB * core) - ); - conf.setString( - "taskmanager.memory.network.min", - String.format("%dm", networkMemPerCoreInMB * core) - ); + conf.setBoolean("taskmanager.memory.preallocate", true); + conf.setBoolean("taskmanager.memory.off-heap", true); + conf.setFloat("taskmanager.memory.fraction", 0.3f); env = ExecutionEnvironment.createLocalEnvironment(conf); - env.setParallelism(core); + env.setParallelism(Runtime.getRuntime().availableProcessors()); } else { env = ExecutionEnvironment.getExecutionEnvironment(); } diff --git a/examples/pom.xml b/examples/pom.xml index d5231a004..0193eae86 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -5,7 +5,7 @@ alink com.alibaba.alink - 1.1-SNAPSHOT + 1.1.0 4.0.0 @@ -43,6 +43,12 @@ flink-table-planner_${alink.scala.major.version} ${flink.version} + + com.alibaba.alink + alink_connectors_pravega_flink-1.9_2.11 + 1.1.0 + compile + diff --git a/examples/src/main/java/io/pravega/alink/PravegaLinearRegressionExample.java b/examples/src/main/java/io/pravega/alink/PravegaLinearRegressionExample.java new file mode 100644 index 000000000..e2cdf1032 --- /dev/null +++ b/examples/src/main/java/io/pravega/alink/PravegaLinearRegressionExample.java @@ -0,0 +1,165 @@ +package io.pravega.alink; + +import com.alibaba.alink.operator.batch.BatchOperator; +import com.alibaba.alink.operator.batch.dataproc.AppendIdBatchOp; +import com.alibaba.alink.operator.batch.dataproc.JsonValueBatchOp; +import com.alibaba.alink.operator.batch.regression.LinearRegTrainBatchOp; +import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp; +import com.alibaba.alink.operator.batch.sink.PravegaSinkBatchOp; +import com.alibaba.alink.operator.batch.source.PravegaSourceBatchOp; +import com.alibaba.alink.operator.common.linear.LinearModelType; +import com.alibaba.alink.operator.stream.dataproc.JsonValueStreamOp; +import com.alibaba.alink.pipeline.regression.LinearRegression; +import com.alibaba.alink.operator.stream.StreamOperator; +import com.alibaba.alink.operator.stream.source.PravegaSourceStreamOp; +import com.alibaba.alink.pipeline.regression.LinearRegressionModel; +import io.pravega.alink.common.CommonParams; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + + +public class PravegaLinearRegressionExample { + + private static final Logger LOG = LoggerFactory.getLogger(PravegaLinearRegressionExample.class); + public final String scope; + public final String streamName; + public final String controllerURI; + + + public PravegaLinearRegressionExample(String scope, String streamName, String controllerURI) { + this.scope = scope; + this.streamName = streamName; + this.controllerURI = controllerURI; + } + + public PravegaLinearRegressionExample PravegaStreamML(String scope, String streamName, String controllerUri) throws Exception { + PravegaSourceStreamOp source = + new PravegaSourceStreamOp() + .setPravegaControllerUri(controllerUri) + .setPravegaScope(scope) + .setPravegaStream(streamName); + + + StreamOperator data = source + .link ( + new JsonValueStreamOp() + .setSelectedCol("event").setReservedCols(new String[] {}) + .setOutputCols( + new String[] {"DateTime", "PDP2GGERDJKS01000000_2", "PDP2GHDEDJKS01000017_2"} + ) + .setJsonPath(new String[] {"$.DateTime", "$.PDP2GGERDJKS01000000_2", "$.PDP2GHDEDJKS01000017_2"}) + ) + .select("CAST(DateTime AS TIMESTAMP) AS DateTime" + + "CAST(PDP2GGERDJKS01000000_2 AS FLOAT) AS PDP2GGERDJKS01000000_2" + + "CAST(PDP2GHDEDJKS01000017_2 AS FLOAT) AS PDP2GHDEDJKS01000017_2" + ); + + //LinearRegPredictStreamOp LR = new LinearRegPredictStreamOp(data ) + System.out.print(data.getSchema()); + data.print(); + StreamOperator.execute(); + return this; + } + + + public PravegaLinearRegressionExample PravegaBatchmML(String scope, String streamName, String controllerUri) throws Exception { + + String labelColName = "PDP2GGERDJKS01000000_2"; + String[] selectedColNames = new String[] {"append_id"}; + + + //Pravega Source Batch Op + System.out.println("Starting to read the dataset from Pravega........"); + PravegaSourceBatchOp source = + new PravegaSourceBatchOp() + .setPravegaControllerUri(controllerUri) + .setPravegaScope(scope) + .setPravegaStream(streamName); + + + BatchOperator data = source.link ( + new JsonValueBatchOp() + .setSelectedCol("event") + .setReservedCols(new String[] {}) + .setOutputCols( + new String[] {"DateTime", "PDP2GGERDJKS01000000_2", "PDP2GHDEDJKS01000017_2"} + ) + .setJsonPath(new String[] {"$.DateTime", "$.PDP2GGERDJKS01000000_2", "$.PDP2GHDEDJKS01000017_2"}) + ) + .select("CAST(DateTime AS TIMESTAMP) AS DateTime, " + + "CAST(PDP2GGERDJKS01000000_2 AS FLOAT) AS PDP2GGERDJKS01000000_2, " + + "CAST(PDP2GHDEDJKS01000017_2 AS FLOAT) AS PDP2GHDEDJKS01000017_2"); + + System.out.print("Orignal Table Schema" + data.getSchema()); + data.print(); + + + BatchOperator pravegadata = data.link(new AppendIdBatchOp()); + + System.out.print("pravegadata Table Schema" + pravegadata.getSchema()); + pravegadata.print(); + + //Single Linear Regression Example + System.out.println("Starting to run the Linear Regression on the original dataset........"); + LinearRegression lrop = new LinearRegression() + .setLabelCol(labelColName) + .setFeatureCols(selectedColNames) + .setPredictionCol("pre"); + + LinearRegTrainBatchOp LRTrain = new LinearRegTrainBatchOp() + .setFeatureCols(selectedColNames) + .setLabelCol(labelColName); + + + List dataSet = lrop.fit(pravegadata).transform(pravegadata).collect(); + + String[] a = dataSet.get(0).toString().split(","); + String[] b = dataSet.get(dataSet.size()-1).toString().split(","); + + Double diff = ((Double.parseDouble(b[b.length-1]))-(Double.parseDouble(a[a.length-1])))/Double.parseDouble(a[a.length-1]); + System.out.println(a[a.length-1]); + System.out.println(b[b.length-1]); + System.out.println(diff); + + // Linear Regression model sink to Pravega + BatchOperator LRModel = pravegadata.link(LRTrain).link (new PravegaSinkBatchOp() + .setDataFormat("csv") + .setPravegaControllerUri("tcp://localhost:9090") + .setPravegaScope("workshop-samples") + .setPravegaStream("stream")); + BatchOperator.execute(); + + PravegaSourceBatchOp newsource = + new PravegaSourceBatchOp() + .setPravegaControllerUri("tcp://localhost:9090") + .setPravegaScope("workshop-samples") + .setPravegaStream("stream"); + System.out.print("Pravega Table Schema" + newsource.getSchema()); + newsource.print(); + + + + return this; + + } + + public static void main(String[] args) throws Exception { + LOG.info("########## FlinkSQLJOINReader START #############"); + CommonParams.init(args); + String scope = CommonParams.getParam("pravega_scope"); + String streamName = CommonParams.getParam("stream_name"); + String controllerURI = CommonParams.getParam("pravega_controller_uri"); + + LOG.info("####################### SCOPE ###################### " + scope); + LOG.info("####################### streamName ###################### " + streamName); + LOG.info("####################### controllerURI ###################### " + controllerURI); + PravegaLinearRegressionExample PravegaLinearRegressionExample = new PravegaLinearRegressionExample (scope, streamName, controllerURI); + PravegaLinearRegressionExample.PravegaBatchmML(scope, streamName, controllerURI); + } +} + diff --git a/examples/src/main/java/io/pravega/alink/common/CommonParams.java b/examples/src/main/java/io/pravega/alink/common/CommonParams.java new file mode 100644 index 000000000..e2920d0ae --- /dev/null +++ b/examples/src/main/java/io/pravega/alink/common/CommonParams.java @@ -0,0 +1,116 @@ +package io.pravega.alink.common; + +import org.apache.flink.api.java.utils.ParameterTool; + +import java.net.URI; + +// All parameters will come from environment variables. This makes it easy +// to configure on Docker, Mesos, Kubernetes, etc. +public class CommonParams { + // By default, we will connect to a standalone Pravega running on localhost. + public static URI getControllerURI() { + return URI.create(getEnvVar("PRAVEGA_CONTROLLER_URI", "tcp://localhost:9090")); + } + + public static boolean isPravegaStandalone() { + return Boolean.parseBoolean(getEnvVar("pravega_standalone", "true")); + } + + public static String getUser() { + return getEnvVar("PRAVEGA_STANDALONE_USER", "admin"); + } + + public static String getPassword() { + return getEnvVar("PRAVEGA_STANDALONE_PASSWORD", "1111_aaaa"); + } + + public static String getScope() { + return getEnvVar("PRAVEGA_SCOPE", "workshop-samples"); + } + + public static String getStreamName() { + return getEnvVar("STREAM_NAME", "workshop-stream"); + } + + public static String getRoutingKeyAttributeName() { + return getEnvVar("ROUTING_KEY_ATTRIBUTE_NAME", "test"); + } + + public static String getMessage() { + return getEnvVar("MESSAGE", "This is Nautilus OE team workshop samples."); + } + + private static String getEnvVar(String name, String defaultValue) { + String value = System.getenv(name); + if (value == null || value.isEmpty()) { + return defaultValue; + } + return value; + } + + public static URI getGatewayURI() { + return URI.create(getEnvVar("GATEWAY_URI", "http://0.0.0.0:3000/")); + } + + public static int getTargetRateEventsPerSec() { + return Integer.parseInt(getEnvVar("PRAVEGA_TARGET_RATE_EVENTS_PER_SEC", "100")); + } + + public static int getScaleFactor() { + return Integer.parseInt(getEnvVar("PRAVEGA_SCALE_FACTOR", "2")); + } + + public static int getMinNumSegments() { + return Integer.parseInt(getEnvVar("PRAVEGA_MIN_NUM_SEGMENTS", "1")); + } + + public static int getListenPort() { + return Integer.parseInt(getEnvVar("LISTEN_PORT", "54672")); + } + + public static ParameterTool params = null; + + public static void init(String[] args) { + params = ParameterTool.fromArgs(args); + } + + public static String getParam(String key) { + if (params != null && params.has(key)) { + return params.get(key); + } else { + return getDefaultParam(key); + } + } + + private static String getDefaultParam(String key) { + String keyValue = null; + if (key != null) { + switch (key) { + case "pravega_scope": + keyValue = "workshop-samples"; + break; + case "stream_name": + keyValue = "workshop-stream"; + break; + case "pravega_controller_uri": + keyValue = "tcp://localhost:9090"; + break; + case "routing_key_attribute_name": + keyValue = "100"; + break; + case "pravega_standalone": + keyValue = "true"; + break; + case "data_file": + keyValue = "initial_data_result_nozero.csv"; + break; + case "message": + keyValue = "To demonstrate Nautilus streams sending a string message"; + break; + default: + keyValue = null; + } + } + return keyValue; + } +} \ No newline at end of file diff --git a/examples/src/main/java/io/pravega/alink/test_readFromCsv.java b/examples/src/main/java/io/pravega/alink/test_readFromCsv.java new file mode 100644 index 000000000..72888eb78 --- /dev/null +++ b/examples/src/main/java/io/pravega/alink/test_readFromCsv.java @@ -0,0 +1,34 @@ +package io.pravega.alink; + +import com.alibaba.alink.operator.batch.BatchOperator; +import com.alibaba.alink.operator.batch.dataproc.AppendIdBatchOp; +import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp; +import com.alibaba.alink.pipeline.regression.LinearRegression; + + +public class test_readFromCsv { + public static void main(String[] args) throws Exception { + + String csvSchema = "time Timestamp ,PDP2GGERDJKS01000000_2 Double, PDP2GHDEDJKS01000017_2 Double"; + CsvSourceBatchOp csvSource = new CsvSourceBatchOp().setFilePath("C:\\Sandbox\\initial_data_result_nozero_alink_test.csv").setIgnoreFirstLine(true).setSchemaStr(csvSchema); + + System.out.println("Orignal Table Schema........"); + System.out.print( csvSource.getSchema()); + csvSource.print(); + + + BatchOperator csvdata = csvSource.link(new AppendIdBatchOp()); + System.out.println("Table Schema after added AppendId........"); + csvdata.print(); + System.out.print( csvSource.getSchema()); + + LinearRegression lrop = new LinearRegression() + .setLabelCol("PDP2GGERDJKS01000000_2") + .setFeatureCols(new String[]{"append_id"}) + .setPredictionCol("pre"); + + lrop.fit(csvdata).transform(csvdata).print(); + } + +} + diff --git a/examples/src/main/java/io/pravega/alink/test_readWriteWithPravega.java b/examples/src/main/java/io/pravega/alink/test_readWriteWithPravega.java new file mode 100644 index 000000000..2fb04fb66 --- /dev/null +++ b/examples/src/main/java/io/pravega/alink/test_readWriteWithPravega.java @@ -0,0 +1,49 @@ +package io.pravega.alink; + + +import com.alibaba.alink.operator.batch.BatchOperator; +import com.alibaba.alink.operator.batch.dataproc.JsonValueBatchOp; +import com.alibaba.alink.operator.batch.sink.PravegaSinkBatchOp; +import com.alibaba.alink.operator.batch.source.PravegaSourceBatchOp; + + +public class test_readWriteWithPravega { + public static void main(String[] args) throws Exception { + + System.out.println("Starting to read the dataset from Pravega........"); + + PravegaSourceBatchOp source = + new PravegaSourceBatchOp() + .setPravegaControllerUri("tcp://localhost:9090") + .setPravegaScope("workshop-samples") + .setPravegaStream("workshop-stream"); + BatchOperator op = source.link ( + new JsonValueBatchOp() + .setSelectedCol("event") + .setReservedCols(new String[] {}) + .setOutputCols( + new String[] {"DateTime", "PDP2GGERDJKS01000000_2", "PDP2GHDEDJKS01000017_2"} + ) + .setJsonPath(new String[] {"$.DateTime", "$.PDP2GGERDJKS01000000_2", "$.PDP2GHDEDJKS01000017_2"}) + ); + System.out.print("Pravega Table Schema" + op.getSchema()); + + PravegaSinkBatchOp sink = new PravegaSinkBatchOp() + .setDataFormat("csv") + .setPravegaControllerUri("tcp://localhost:9090") + .setPravegaScope("workshop-samples") + .setPravegaStream("stream"); + sink.linkFrom(op); + BatchOperator.execute(); + + PravegaSourceBatchOp newsource = + new PravegaSourceBatchOp() + .setPravegaControllerUri("tcp://localhost:9090") + .setPravegaScope("workshop-samples") + .setPravegaStream("stream"); + System.out.print("Pravega Table Schema" + newsource.getSchema()); + newsource.print(); + + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index b916ebea2..7f053e90f 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.alibaba.alink alink pom - 1.1-SNAPSHOT + 1.1.0 alink Alink is the Machine Learning algorithm platform based on Flink, developed by the PAI team of Alibaba computing platform. @@ -91,14 +91,15 @@ ${java.version} ${java.version} UTF-8 - 1.10 - 1.10.0 + 1.9 + 1.9.0 2.11 2.11.11 4.5.3 2.8.2 3.3.2 0.11.2 + 0.6.2