Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Introduce milvus sink connector #4885

Closed
wants to merge 21 commits into from
Closed
94 changes: 94 additions & 0 deletions docs/en/connector-v2/sink/Milvus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
> Milvus sink connector

## Support These Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>

## Key Features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
liugddx marked this conversation as resolved.
Show resolved Hide resolved
- [ ] [cdc](../../concept/connector-v2-features.md)

## Description

Write data to Apache milvus.

## Sink Options

| Name | Type | Required | Default | Description |
|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------|
| milvus_host | String | Yes | - | The milvus host. |
| milvus_port | Int | No | 19530 | This port is for gRPC. Default is 19530. |
| username | String | Yes | - | The username of milvus server. |
| password | String | Yes | - | The password of milvus server. |
| collection_name | String | No | - | A collection of milvus, which is similar to a table in a relational database. |
| partition_field | String | No | - | Partition fields, which must be included in the collection's schema. |
| openai_engine | String | No | text-embedding-ada-002 | Text embedding model. Default is 'text-embedding-ada-002'. |
| openai_api_key | String | No | - | Use your own Open AI API Key here. |
| embeddings_fields | String | No | - | Fields to be embedded,They use`,`for splitting. |

### Data Type Mapping

| Milvus Data type | SeaTunnel Data type |
|------------------|---------------------|
| Bool | BOOLEAN |
| Int8 | TINYINT |
| Int16 | SMALLINT |
| Int32 | INT |
| Int64 | BIGINT |
| Float | FLOAT |
| Double | DOUBLE |
| VarChar | DECIMAL |
| String | STRING |

## Examples

```hocon
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
LocalFile {
schema {
fields {
bookID = string
title_1 = string
title_2 = string
}
}
path = "/tmp/milvus_test/book"
file_format_type = "csv"
}
}

transform {
}

sink {
Milvus {
milvus_host = localhost
milvus_port = 19530
username = root
password = Milvus
collection_name = title_db
openai_engine = text-embedding-ada-002
openai_api_key = sk-xxxx
embeddings_fields = title_2
}
}
```

## Changelog

### next version

- Add Milvus Sink Connector

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,4 @@ seatunnel.source.Rocketmq = connector-rocketmq
seatunnel.sink.Rocketmq = connector-rocketmq
seatunnel.source.Paimon = connector-paimon
seatunnel.sink.Paimon = connector-paimon
seatunnel.sink.Milvus = connector-milvus
59 changes: 59 additions & 0 deletions seatunnel-connectors-v2/connector-milvus/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>

<artifactId>connector-milvus</artifactId>
<name>SeaTunnel : Connectors V2 : Milvus</name>

<properties>
<milvus.version>2.2.2</milvus.version>
<openai.sdk.version>0.12.0</openai.sdk.version>
</properties>

<dependencies>

<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>${milvus.version}</version>
</dependency>

<dependency>
<groupId>com.theokanning.openai-gpt3-java</groupId>
<artifactId>service</artifactId>
<version>${openai.sdk.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.milvus.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.io.Serializable;

public class MilvusOptions implements Serializable {

public static final Option<String> MILVUS_HOST =
Options.key("milvus_host")
.stringType()
.noDefaultValue()
.withDescription("The milvus host");

public static final Option<Integer> MILVUS_PORT =
Options.key("milvus_port")
.intType()
.defaultValue(19530)
.withDescription("This port is for gRPC. Default is 19530");

public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription("The username of milvus server.");

public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("The password of milvus server.");

public static final Option<String> COLLECTION_NAME =
Options.key("collection_name")
.stringType()
.noDefaultValue()
.withDescription(
"A collection of milvus, which is similar to a table in a relational database.");

public static final Option<String> PARTITION_FIELD =
Options.key("partition_field")
.stringType()
.noDefaultValue()
.withDescription(
"Partition fields, which must be included in the collection's schema.");

public static final Option<String> OPENAI_ENGINE =
Options.key("openai_engine")
.stringType()
.defaultValue("text-embedding-ada-002")
.withDescription("Text embedding model. Default is 'text-embedding-ada-002'");

public static final Option<String> OPENAI_API_KEY =
Options.key("openai_api_key")
.stringType()
.noDefaultValue()
.withDescription("Use your own Open AI API Key here.");

public static final Option<String> EMBEDDINGS_FIELDS =
Options.key("embeddings_fields")
.stringType()
.noDefaultValue()
.withDescription("Fields to be embedded,They use`,`for splitting");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.milvus.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;

@Setter
@Getter
@ToString
public class MilvusSinkConfig implements Serializable {

private String milvusHost;
private Integer milvusPort;
private String collectionName;
private String partitionField;
private String userName;
private String password;
private String openaiEngine;
private String openaiApiKey;
private String embeddingsFields;

public static MilvusSinkConfig of(ReadonlyConfig config) {
MilvusSinkConfig sinkConfig = new MilvusSinkConfig();
sinkConfig.setMilvusHost(config.get(MilvusOptions.MILVUS_HOST));
sinkConfig.setMilvusPort(config.get(MilvusOptions.MILVUS_PORT));
sinkConfig.setCollectionName(config.get(MilvusOptions.COLLECTION_NAME));
config.getOptional(MilvusOptions.PARTITION_FIELD).ifPresent(sinkConfig::setPartitionField);
config.getOptional(MilvusOptions.USERNAME).ifPresent(sinkConfig::setUserName);
config.getOptional(MilvusOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
config.getOptional(MilvusOptions.OPENAI_ENGINE).ifPresent(sinkConfig::setOpenaiEngine);
config.getOptional(MilvusOptions.OPENAI_API_KEY).ifPresent(sinkConfig::setOpenaiApiKey);
config.getOptional(MilvusOptions.EMBEDDINGS_FIELDS)
.ifPresent(sinkConfig::setEmbeddingsFields);

return sinkConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.milvus.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

public enum MilvusConnectorErrorCode implements SeaTunnelErrorCode {
RESPONSE_FAILED("Milvus-01", "The server returns an exception.");

private final String code;
private final String description;

MilvusConnectorErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.milvus.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

public class MilvusConnectorException extends SeaTunnelRuntimeException {
public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}

public MilvusConnectorException(
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
super(seaTunnelErrorCode, errorMessage, cause);
}
}
Loading
Loading