-
Notifications
You must be signed in to change notification settings - Fork 2k
[Feature][Connector-V2] Introduce milvus sink connector #4885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
e4232e5
add milvus connector
liugddx 8ed28ab
fix code style
liugddx cead8d7
add connector-milvus in dist
liugddx 5724ef0
close openai client
liugddx 6fb2120
add license header
liugddx 4568e9b
fix some bug
liugddx 46be4a8
add doc
liugddx 83f2288
add collection check
liugddx 4adf01a
fix dead line
liugddx 95e54b4
add some argument check
2abebf4
add e2e test case
4b7c4b5
add e2e test case
6a51371
add license
d3fd9c0
add license
f7d01b8
fix some bug
d0c00f0
add cdc feature
92b11c8
format doc
fe55be8
format doc
liugddx 39e809a
fix error
liugddx c13adcb
fix error
liugddx f6ad8ad
fix error
liugddx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
> Milvus sink connector | ||
|
||
## Support These Engines | ||
|
||
> Spark<br/> | ||
> Flink<br/> | ||
> SeaTunnel Zeta<br/> | ||
|
||
## Key Features | ||
|
||
- [ ] [exactly-once](../../concept/connector-v2-features.md) | ||
- [ ] [cdc](../../concept/connector-v2-features.md) | ||
|
||
## Description | ||
|
||
Write data to Apache milvus. | ||
|
||
## Sink Options | ||
|
||
| Name | Type | Required | Default | Description | | ||
|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------| | ||
| milvus_host | String | Yes | - | The milvus host. | | ||
| milvus_port | Int | No | 19530 | This port is for gRPC. Default is 19530. | | ||
| username | String | Yes | - | The username of milvus server. | | ||
| password | String | Yes | - | The password of milvus server. | | ||
| collection_name | String | No | - | A collection of milvus, which is similar to a table in a relational database. | | ||
| partition_field | String | No | - | Partition fields, which must be included in the collection's schema. | | ||
| openai_engine | String | No | text-embedding-ada-002 | Text embedding model. Default is 'text-embedding-ada-002'. | | ||
| openai_api_key | String | No | - | Use your own Open AI API Key here. | | ||
| embeddings_fields | String | No | - | Fields to be embedded,They use`,`for splitting. | | ||
|
||
### Data Type Mapping | ||
|
||
| Milvus Data type | SeaTunnel Data type | | ||
|------------------|---------------------| | ||
| Bool | BOOLEAN | | ||
| Int8 | TINYINT | | ||
| Int16 | SMALLINT | | ||
| Int32 | INT | | ||
| Int64 | BIGINT | | ||
| Float | FLOAT | | ||
| Double | DOUBLE | | ||
| VarChar | DECIMAL | | ||
| String | STRING | | ||
|
||
## Examples | ||
|
||
```hocon | ||
env { | ||
# You can set engine configuration here | ||
execution.parallelism = 1 | ||
job.mode = "BATCH" | ||
checkpoint.interval = 5000 | ||
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" | ||
} | ||
|
||
source { | ||
# This is a example source plugin **only for test and demonstrate the feature source plugin** | ||
LocalFile { | ||
schema { | ||
fields { | ||
bookID = string | ||
title_1 = string | ||
title_2 = string | ||
} | ||
} | ||
path = "/tmp/milvus_test/book" | ||
file_format_type = "csv" | ||
} | ||
} | ||
|
||
transform { | ||
} | ||
|
||
sink { | ||
Milvus { | ||
milvus_host = localhost | ||
milvus_port = 19530 | ||
username = root | ||
password = Milvus | ||
collection_name = title_db | ||
openai_engine = text-embedding-ada-002 | ||
openai_api_key = sk-xxxx | ||
embeddings_fields = title_2 | ||
} | ||
} | ||
``` | ||
|
||
## Changelog | ||
|
||
### next version | ||
|
||
- Add Milvus Sink Connector | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
|
||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to You under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
|
||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>seatunnel-connectors-v2</artifactId> | ||
<version>${revision}</version> | ||
</parent> | ||
|
||
<artifactId>connector-milvus</artifactId> | ||
<name>SeaTunnel : Connectors V2 : Milvus</name> | ||
|
||
<properties> | ||
<milvus.version>2.2.2</milvus.version> | ||
<openai.sdk.version>0.12.0</openai.sdk.version> | ||
</properties> | ||
|
||
<dependencies> | ||
|
||
<dependency> | ||
<groupId>io.milvus</groupId> | ||
<artifactId>milvus-sdk-java</artifactId> | ||
<version>${milvus.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.theokanning.openai-gpt3-java</groupId> | ||
<artifactId>service</artifactId> | ||
<version>${openai.sdk.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>connector-common</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
</project> |
82 changes: 82 additions & 0 deletions
82
.../src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.milvus.config; | ||
|
||
import org.apache.seatunnel.api.configuration.Option; | ||
import org.apache.seatunnel.api.configuration.Options; | ||
|
||
import java.io.Serializable; | ||
|
||
public class MilvusOptions implements Serializable { | ||
|
||
public static final Option<String> MILVUS_HOST = | ||
Options.key("milvus_host") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("The milvus host"); | ||
|
||
public static final Option<Integer> MILVUS_PORT = | ||
Options.key("milvus_port") | ||
.intType() | ||
.defaultValue(19530) | ||
.withDescription("This port is for gRPC. Default is 19530"); | ||
|
||
public static final Option<String> USERNAME = | ||
Options.key("username") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("The username of milvus server."); | ||
|
||
public static final Option<String> PASSWORD = | ||
Options.key("password") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("The password of milvus server."); | ||
|
||
public static final Option<String> COLLECTION_NAME = | ||
Options.key("collection_name") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription( | ||
"A collection of milvus, which is similar to a table in a relational database."); | ||
|
||
public static final Option<String> PARTITION_FIELD = | ||
Options.key("partition_field") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription( | ||
"Partition fields, which must be included in the collection's schema."); | ||
|
||
public static final Option<String> OPENAI_ENGINE = | ||
Options.key("openai_engine") | ||
.stringType() | ||
.defaultValue("text-embedding-ada-002") | ||
.withDescription("Text embedding model. Default is 'text-embedding-ada-002'"); | ||
|
||
public static final Option<String> OPENAI_API_KEY = | ||
Options.key("openai_api_key") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("Use your own Open AI API Key here."); | ||
|
||
public static final Option<String> EMBEDDINGS_FIELDS = | ||
Options.key("embeddings_fields") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("Fields to be embedded,They use`,`for splitting"); | ||
} |
57 changes: 57 additions & 0 deletions
57
...c/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.seatunnel.connectors.seatunnel.milvus.config; | ||
|
||
import org.apache.seatunnel.api.configuration.ReadonlyConfig; | ||
|
||
import lombok.Getter; | ||
import lombok.Setter; | ||
import lombok.ToString; | ||
|
||
import java.io.Serializable; | ||
|
||
@Setter | ||
@Getter | ||
@ToString | ||
public class MilvusSinkConfig implements Serializable { | ||
|
||
private String milvusHost; | ||
private Integer milvusPort; | ||
private String collectionName; | ||
private String partitionField; | ||
private String userName; | ||
private String password; | ||
private String openaiEngine; | ||
private String openaiApiKey; | ||
private String embeddingsFields; | ||
|
||
public static MilvusSinkConfig of(ReadonlyConfig config) { | ||
MilvusSinkConfig sinkConfig = new MilvusSinkConfig(); | ||
sinkConfig.setMilvusHost(config.get(MilvusOptions.MILVUS_HOST)); | ||
sinkConfig.setMilvusPort(config.get(MilvusOptions.MILVUS_PORT)); | ||
sinkConfig.setCollectionName(config.get(MilvusOptions.COLLECTION_NAME)); | ||
config.getOptional(MilvusOptions.PARTITION_FIELD).ifPresent(sinkConfig::setPartitionField); | ||
config.getOptional(MilvusOptions.USERNAME).ifPresent(sinkConfig::setUserName); | ||
config.getOptional(MilvusOptions.PASSWORD).ifPresent(sinkConfig::setPassword); | ||
config.getOptional(MilvusOptions.OPENAI_ENGINE).ifPresent(sinkConfig::setOpenaiEngine); | ||
config.getOptional(MilvusOptions.OPENAI_API_KEY).ifPresent(sinkConfig::setOpenaiApiKey); | ||
config.getOptional(MilvusOptions.EMBEDDINGS_FIELDS) | ||
.ifPresent(sinkConfig::setEmbeddingsFields); | ||
|
||
return sinkConfig; | ||
} | ||
} |
42 changes: 42 additions & 0 deletions
42
.../org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.milvus.exception; | ||
|
||
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; | ||
|
||
public enum MilvusConnectorErrorCode implements SeaTunnelErrorCode { | ||
RESPONSE_FAILED("Milvus-01", "The server returns an exception."); | ||
|
||
private final String code; | ||
private final String description; | ||
|
||
MilvusConnectorErrorCode(String code, String description) { | ||
this.code = code; | ||
this.description = description; | ||
} | ||
|
||
@Override | ||
public String getCode() { | ||
return code; | ||
} | ||
|
||
@Override | ||
public String getDescription() { | ||
return description; | ||
} | ||
} |
32 changes: 32 additions & 0 deletions
32
.../org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.milvus.exception; | ||
|
||
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; | ||
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; | ||
|
||
public class MilvusConnectorException extends SeaTunnelRuntimeException { | ||
public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { | ||
super(seaTunnelErrorCode, errorMessage); | ||
} | ||
|
||
public MilvusConnectorException( | ||
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { | ||
super(seaTunnelErrorCode, errorMessage, cause); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.