Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Introduce milvus sink connector #4885

Closed
wants to merge 21 commits into from
67 changes: 67 additions & 0 deletions docs/en/connector-v2/sink/Milvus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
> Milvus sink connector

## Support These Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>

## Key Features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
liugddx marked this conversation as resolved.
Show resolved Hide resolved
- [ ] [cdc](../../concept/connector-v2-features.md)

## Description

Write data to Apache milvus.

## Sink Options

| Name | Type | Required | Default | Description |
|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------|
| milvus_host | String | Yes | - | The milvus host. |
| milvus_port | Int | No | 19530 | This port is for gRPC. Default is 19530. |
| username | String | Yes | - | The username of milvus server. |
| password | String | Yes | - | The password of milvus server. |
| collection_name | String | No | - | A collection of milvus, which is similar to a table in a relational database. |
| partition_field | String | No | - | Partition fields, which must be included in the collection's schema. |
| openai_engine | String | No | text-embedding-ada-002 | Text embedding model. Default is 'text-embedding-ada-002'. |
| openai_api_key | String | No | - | Use your own Open AI API Key here. |
| embeddings_fields | String | No | - | Fields to be embedded,They use`,`for splitting. |

### Data Type Mapping

| Milvus Data type | SeaTunnel Data type |
|------------------|---------------------|
| Bool | BOOLEAN |
| Int8 | TINYINT |
| Int16 | SMALLINT |
| Int32 | INT |
| Int64 | BIGINT |
| Float | FLOAT |
| Double | DOUBLE |
| VarChar | DECIMAL |
| String | STRING |

## Examples

```hocon
sink {
Milvus {
milvus_host = localhost
milvus_port = 19530
username = root
password = Milvus
collection_name = title_db
openai_engine = text-embedding-ada-002
openai_api_key = sk-xxxx
embeddings_fields = title_2
}
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should put a demo can run without any modify. Not just sink config. It's more useful.


## Changelog

### next version

- Add Milvus Sink Connector

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,4 @@ seatunnel.source.Rocketmq = connector-rocketmq
seatunnel.sink.Rocketmq = connector-rocketmq
seatunnel.source.Paimon = connector-paimon
seatunnel.sink.Paimon = connector-paimon
seatunnel.sink.Milvus = connector-milvus
59 changes: 59 additions & 0 deletions seatunnel-connectors-v2/connector-milvus/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>

<artifactId>connector-milvus</artifactId>
<name>SeaTunnel : Connectors V2 : Milvus</name>

<properties>
<milvus.version>2.2.2</milvus.version>
<openai.sdk.version>0.12.0</openai.sdk.version>
</properties>

<dependencies>

<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>${milvus.version}</version>
</dependency>

<dependency>
<groupId>com.theokanning.openai-gpt3-java</groupId>
<artifactId>service</artifactId>
<version>${openai.sdk.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.milvus.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.SeaTunnelSink;

import java.io.Serializable;

/** Utility class to milvus configuration options, used by {@link SeaTunnelSink}. */
public class MilvusConfig implements Serializable {

public static final Option<String> MILVUS_HOST =
Options.key("milvus_host")
.stringType()
.noDefaultValue()
.withDescription("The milvus host");

public static final Option<Integer> MILVUS_PORT =
Options.key("milvus_port")
.intType()
.defaultValue(19530)
.withDescription("This port is for gRPC. Default is 19530");

public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription("The username of milvus server.");

public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("The password of milvus server.");

public static final Option<String> COLLECTION_NAME =
Options.key("collection_name")
.stringType()
.noDefaultValue()
.withDescription(
"A collection of milvus, which is similar to a table in a relational database.");

public static final Option<String> PARTITION_FIELD =
Options.key("partition_field")
.stringType()
.noDefaultValue()
.withDescription(
"Partition fields, which must be included in the collection's schema.");

public static final Option<String> OPENAI_ENGINE =
Options.key("openai_engine")
.stringType()
.defaultValue("text-embedding-ada-002")
.withDescription("Text embedding model. Default is 'text-embedding-ada-002'");

public static final Option<String> OPENAI_API_KEY =
Options.key("openai_api_key")
.stringType()
.noDefaultValue()
.withDescription("Use your own Open AI API Key here.");

public static final Option<String> EMBEDDINGS_FIELDS =
Options.key("embeddings_fields")
.stringType()
.noDefaultValue()
.withDescription("Fields to be embedded,They use`,`for splitting");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.milvus.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class MilvusOptions implements Serializable {

private String milvusHost;
private Integer milvusPort;
private String collectionName;
private String partitionField;
private String userName;
private String password;
private String openaiEngine;
private String openaiApiKey;
private String embeddingsFields;

public MilvusOptions(Config config) {
this.milvusHost = config.getString(MilvusConfig.MILVUS_HOST.key());
if (config.hasPath(MilvusConfig.MILVUS_PORT.key())) {
this.milvusPort = config.getInt(MilvusConfig.MILVUS_PORT.key());
} else {
this.milvusPort = MilvusConfig.MILVUS_PORT.defaultValue();
}
this.collectionName = config.getString(MilvusConfig.COLLECTION_NAME.key());
this.userName = config.getString(MilvusConfig.USERNAME.key());
this.password = config.getString(MilvusConfig.PASSWORD.key());

if (config.hasPath(MilvusConfig.PARTITION_FIELD.key())) {
this.partitionField = config.getString(MilvusConfig.PARTITION_FIELD.key());
}
if (config.hasPath(MilvusConfig.OPENAI_ENGINE.key())) {
this.openaiEngine = config.getString(MilvusConfig.OPENAI_ENGINE.key());
} else {
this.openaiEngine = MilvusConfig.OPENAI_ENGINE.defaultValue();
}
if (config.hasPath(MilvusConfig.OPENAI_API_KEY.key())) {
this.openaiApiKey = config.getString(MilvusConfig.OPENAI_API_KEY.key());
}
if (config.hasPath(MilvusConfig.EMBEDDINGS_FIELDS.key())) {
this.embeddingsFields = config.getString(MilvusConfig.EMBEDDINGS_FIELDS.key());
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);. Then just use readonlyConfig.get(MilvusConfig.MILVUS_HOST), it will hande default value.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.milvus.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

public enum MilvusConnectorErrorCode implements SeaTunnelErrorCode {
RESPONSE_FAILED("Milvus-01", "The server returns an exception.");

private final String code;
private final String description;

MilvusConnectorErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.milvus.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

public class MilvusConnectorException extends SeaTunnelRuntimeException {
public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}

public MilvusConnectorException(
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
super(seaTunnelErrorCode, errorMessage, cause);
}
}
Loading
Loading