Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.1.0 flink 1.9 pravega #62

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connectors/connector-kafka-0.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>alink_connectors</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>1.1-SNAPSHOT</version>
<version>1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion connectors/connector-kafka-0.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>alink_connectors</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>1.1-SNAPSHOT</version>
<version>1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion connectors/connector-kafka-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>alink_connectors</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>1.1-SNAPSHOT</version>
<version>1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion connectors/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>alink_connectors</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>1.1-SNAPSHOT</version>
<version>1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
62 changes: 62 additions & 0 deletions connectors/connector-pravega/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>alink_connectors</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>


<artifactId>alink_connectors_pravega_flink-${alink.flink.major.version}_${alink.scala.major.version}</artifactId>
<name>alink-connector-pravega</name>

<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-${alink.flink.major.version}_${alink.scala.major.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- dependency on PRAVEGA -->
<dependency>
<groupId>io.pravega</groupId>
<artifactId>pravega-connectors-flink-${alink.flink.major.version}_2.12</artifactId>
<version>${pravega.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.alibaba.alink.operator.batch.sink;


import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.common.io.csv.CsvUtil;
import com.alibaba.alink.operator.common.io.pravega.PravegaCsvRowSerializationSchema;
import com.alibaba.alink.operator.common.io.pravega.PravegaJsonRowSerializationSchema;
import com.alibaba.alink.params.io.PravegaSinkParams;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.connectors.flink.FlinkPravegaOutputFormat;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.PravegaEventRouter;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.types.Row;

import java.net.URI;
import java.util.Collection;


@IoOpAnnotation(name = "pravega", ioType = IOType.SinkBatch)
public class PravegaSinkBatchOp extends BaseSinkBatchOp<PravegaSinkBatchOp>
implements PravegaSinkParams<PravegaSinkBatchOp> {


public PravegaSinkBatchOp() {
this(new Params());
}


public PravegaSinkBatchOp(Params params) {
super(AnnotationUtils.annotatedName(PravegaSinkBatchOp.class), params);
}

@Override
protected PravegaSinkBatchOp sinkFrom(BatchOperator in) {
final String pravegaControllerUri = getPravegaControllerUri();
final String pravegaScope = getPravegaScope();
final String pravegaStream = getPravegaStream();
final Boolean PravegaStandalone = getPravegaStandalone();
final String dataFormat = getDataFormat();
final String[] colNames = in.getColNames();
final String fieldDelimiter = CsvUtil.unEscape(getFieldDelimiter());

System.out.println(in.collect().toString());

PravegaEventRouter<Row> router = new PravegaEventRouter<Row>() {
@Override
public String getRoutingKey(Row eventType) {
final String pravegaRoutingKey = getPravegaRoutingKey();
return pravegaRoutingKey;
}
};

PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
.withControllerURI(URI.create(pravegaControllerUri))
.withDefaultScope(pravegaScope)
//Enable it if with Nautilus
//.withCredentials(credentials)
.withHostnameValidation(false);


//Create Pravega Scope and Stream
Stream stream = pravegaConfig.resolve(pravegaStream);
try (StreamManager streamManager = StreamManager.create(pravegaConfig.getClientConfig())) {
// create the requested scope (if standalone)
if (PravegaStandalone) {
streamManager.createScope(pravegaScope);
}
// create the requested stream based on the given stream configuration
streamManager.createStream(stream.getScope(), stream.getStreamName(), StreamConfiguration.builder().build());
}

SerializationSchema<Row> serializationSchema = null;
//serializationSchema = new JsonRowSerializationSchema(colNames);
if (dataFormat.equalsIgnoreCase("csv")) {
serializationSchema = new PravegaCsvRowSerializationSchema(fieldDelimiter);
} else if (dataFormat.equalsIgnoreCase("json")) {
serializationSchema = new PravegaJsonRowSerializationSchema(colNames);
}


FlinkPravegaOutputFormat<Row> outputFormat = FlinkPravegaOutputFormat.<Row>builder()
.forStream(pravegaStream)
.withPravegaConfig(pravegaConfig)
.withSerializationSchema(serializationSchema)
.withEventRouter(router)
.build();

Collection<Row> inputData = in.collect();
ExecutionEnvironment env = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();

env.fromCollection(inputData).output(outputFormat).name("pravega_" + pravegaScope + "/" + pravegaStream);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.alibaba.alink.operator.batch.source;

import com.alibaba.alink.common.MLEnvironmentFactory;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.common.utils.DataSetConversionUtil;
import com.alibaba.alink.operator.common.io.csv.CsvUtil;
import com.alibaba.alink.operator.common.io.pravega.PravegaRowDeserializationSchema;
import com.alibaba.alink.params.io.PravegaSourceParams;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.StreamCut;
import io.pravega.connectors.flink.FlinkPravegaInputFormat;
import io.pravega.connectors.flink.PravegaConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;

import java.net.URI;

@IoOpAnnotation(name = "pravega", ioType = IOType.SourceBatch)
public final class PravegaSourceBatchOp extends BaseSourceBatchOp<PravegaSourceBatchOp>
implements PravegaSourceParams<PravegaSourceBatchOp> {

public PravegaSourceBatchOp() {
this(new Params());
}


public PravegaSourceBatchOp(Params params) {
super(AnnotationUtils.annotatedName(PravegaSourceBatchOp.class), params);
}

@Override
protected Table initializeDataSource() {
String pravegaControllerUri = getPravegaControllerUri();
String pravegascope = getPravegaScope();
String pravegaStream = getPravegaStream();
StreamCut pravegaStartStreamCut = StreamCut.from(getPravegaStartStreamCut());
StreamCut pravegaEndStreamCut = StreamCut.from(getPravegaEndStreamCut());
String schemaStr = "event String";
//DeserializationSchema deserializationSchema = getPravegaDeserializer();
final String[] colNames = CsvUtil.getColNames(schemaStr);
final TypeInformation[] colTypes = CsvUtil.getColTypes(schemaStr);


//Properties props = new Properties();
//props.setProperty("pravegaControllerUri", pravegaControllerUri);
//props.setProperty("pravegascope", pravegascope);


PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
.withControllerURI(URI.create(pravegaControllerUri))
.withDefaultScope(pravegascope)
//Enable it if with Nautilus
//.withCredentials(credentials)
.withHostnameValidation(false);


FlinkPravegaInputFormat<Row> source = FlinkPravegaInputFormat.<Row>builder()
.withPravegaConfig(pravegaConfig)
.forStream(pravegaStream, pravegaStartStreamCut, pravegaEndStreamCut)
.withDeserializationSchema(new PravegaRowDeserializationSchema(Row.class))
.build();

ExecutionEnvironment execEnv = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();

DataSource<Row> data = execEnv.createInput(source, TypeInformation.of(Row.class)).name("Pravega BatchReader");

return DataSetConversionUtil.toTable(getMLEnvironmentId(), data, colNames, colTypes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.alibaba.alink.operator.common.io.pravega;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.types.Row;


public class PravegaCsvRowSerializationSchema implements SerializationSchema<Row> {

private final String fieldDelimiter;

public PravegaCsvRowSerializationSchema(String fieldDelimiter) {
this.fieldDelimiter = fieldDelimiter;
}

@Override
public byte[] serialize(Row row) {
StringBuilder sbd = new StringBuilder();
int n = row.getArity();
for (int i = 0; i < n; i++) {
Object obj = row.getField(i);
if (obj != null) {
sbd.append(obj);
}
if (i != n - 1) {
sbd.append(this.fieldDelimiter);
}
}
String str = sbd.toString();

try {
return str.getBytes("UTF-8");
} catch (Exception e) {
throw new RuntimeException("Failed to serialize row", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.alibaba.alink.operator.common.io.pravega;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.types.Row;

import java.util.HashMap;

import static com.alibaba.alink.common.utils.JsonConverter.gson;

public class PravegaJsonRowSerializationSchema implements SerializationSchema<Row> {

String[] colNames;

public PravegaJsonRowSerializationSchema(String[] colNames) {
this.colNames = colNames;
}

@Override
public byte[] serialize(Row row) {
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < colNames.length; i++) {
Object obj = row.getField(i);
if (obj != null) {
map.put(colNames[i], obj);
}
}
String str = gson.toJson(map);
try {
return str.getBytes("UTF-8");
} catch (Exception e) {
throw new RuntimeException("Failed to serialize row", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.alibaba.alink.operator.common.io.pravega;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.nio.charset.Charset;

public class PravegaRowDeserializationSchema<T> implements DeserializationSchema<T> {

private Class<T> valueType;

public PravegaRowDeserializationSchema(Class<T> valueType) {
this.valueType = valueType;
}

@Override
public T deserialize(byte[] event) throws IOException {
Row row = new Row(1);
row.setField(0, event != null ? new String(event, Charset.forName("UTF-8")) : null);
return (T) row;
}

@Override
public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(valueType);
}
}
Loading