diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md new file mode 100644 index 00000000000..85ead00f647 --- /dev/null +++ b/docs/en/connector-v2/sink/Milvus.md @@ -0,0 +1,94 @@ +> Milvus sink connector + +## Support These Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## Key Features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +## Description + +Write data to Apache milvus. + +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------| +| milvus_host | String | Yes | - | The milvus host. | +| milvus_port | Int | No | 19530 | This port is for gRPC. Default is 19530. | +| username | String | Yes | - | The username of milvus server. | +| password | String | Yes | - | The password of milvus server. | +| collection_name | String | No | - | A collection of milvus, which is similar to a table in a relational database. | +| partition_field | String | No | - | Partition fields, which must be included in the collection's schema. | +| openai_engine | String | No | text-embedding-ada-002 | Text embedding model. Default is 'text-embedding-ada-002'. | +| openai_api_key | String | No | - | Use your own Open AI API Key here. | +| embeddings_fields | String | No | - | Fields to be embedded,They use`,`for splitting. | + +### Data Type Mapping + +| Milvus Data type | SeaTunnel Data type | +|------------------|---------------------| +| Bool | BOOLEAN | +| Int8 | TINYINT | +| Int16 | SMALLINT | +| Int32 | INT | +| Int64 | BIGINT | +| Float | FLOAT | +| Double | DOUBLE | +| VarChar | DECIMAL | +| String | STRING | + +## Examples + +```hocon +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + LocalFile { + schema { + fields { + bookID = string + title_1 = string + title_2 = string + } + } + path = "/tmp/milvus_test/book" + file_format_type = "csv" + } +} + +transform { +} + +sink { + Milvus { + milvus_host = localhost + milvus_port = 19530 + username = root + password = Milvus + collection_name = title_db + openai_engine = text-embedding-ada-002 + openai_api_key = sk-xxxx + embeddings_fields = title_2 + } +} +``` + +## Changelog + +### next version + +- Add Milvus Sink Connector + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index de6593b4523..d2a743a1021 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -110,3 +110,4 @@ seatunnel.source.Rocketmq = connector-rocketmq seatunnel.sink.Rocketmq = connector-rocketmq seatunnel.source.Paimon = connector-paimon seatunnel.sink.Paimon = connector-paimon +seatunnel.sink.Milvus = connector-milvus \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-milvus/pom.xml b/seatunnel-connectors-v2/connector-milvus/pom.xml new file mode 100644 index 00000000000..42e84e3687d --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/pom.xml @@ -0,0 +1,59 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-milvus + SeaTunnel : Connectors V2 : Milvus + + + 2.2.2 + 0.12.0 + + + + + + io.milvus + milvus-sdk-java + ${milvus.version} + + + + com.theokanning.openai-gpt3-java + service + ${openai.sdk.version} + + + + org.apache.seatunnel + connector-common + ${project.version} + + + + + diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java new file mode 100644 index 00000000000..f64b3fd731b --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.io.Serializable; + +public class MilvusOptions implements Serializable { + + public static final Option MILVUS_HOST = + Options.key("milvus_host") + .stringType() + .noDefaultValue() + .withDescription("The milvus host"); + + public static final Option MILVUS_PORT = + Options.key("milvus_port") + .intType() + .defaultValue(19530) + .withDescription("This port is for gRPC. Default is 19530"); + + public static final Option USERNAME = + Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("The username of milvus server."); + + public static final Option PASSWORD = + Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("The password of milvus server."); + + public static final Option COLLECTION_NAME = + Options.key("collection_name") + .stringType() + .noDefaultValue() + .withDescription( + "A collection of milvus, which is similar to a table in a relational database."); + + public static final Option PARTITION_FIELD = + Options.key("partition_field") + .stringType() + .noDefaultValue() + .withDescription( + "Partition fields, which must be included in the collection's schema."); + + public static final Option OPENAI_ENGINE = + Options.key("openai_engine") + .stringType() + .defaultValue("text-embedding-ada-002") + .withDescription("Text embedding model. Default is 'text-embedding-ada-002'"); + + public static final Option OPENAI_API_KEY = + Options.key("openai_api_key") + .stringType() + .noDefaultValue() + .withDescription("Use your own Open AI API Key here."); + + public static final Option EMBEDDINGS_FIELDS = + Options.key("embeddings_fields") + .stringType() + .noDefaultValue() + .withDescription("Fields to be embedded,They use`,`for splitting"); +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java new file mode 100644 index 00000000000..56aff7d6550 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.milvus.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.io.Serializable; + +@Setter +@Getter +@ToString +public class MilvusSinkConfig implements Serializable { + + private String milvusHost; + private Integer milvusPort; + private String collectionName; + private String partitionField; + private String userName; + private String password; + private String openaiEngine; + private String openaiApiKey; + private String embeddingsFields; + + public static MilvusSinkConfig of(ReadonlyConfig config) { + MilvusSinkConfig sinkConfig = new MilvusSinkConfig(); + sinkConfig.setMilvusHost(config.get(MilvusOptions.MILVUS_HOST)); + sinkConfig.setMilvusPort(config.get(MilvusOptions.MILVUS_PORT)); + sinkConfig.setCollectionName(config.get(MilvusOptions.COLLECTION_NAME)); + config.getOptional(MilvusOptions.PARTITION_FIELD).ifPresent(sinkConfig::setPartitionField); + config.getOptional(MilvusOptions.USERNAME).ifPresent(sinkConfig::setUserName); + config.getOptional(MilvusOptions.PASSWORD).ifPresent(sinkConfig::setPassword); + config.getOptional(MilvusOptions.OPENAI_ENGINE).ifPresent(sinkConfig::setOpenaiEngine); + config.getOptional(MilvusOptions.OPENAI_API_KEY).ifPresent(sinkConfig::setOpenaiApiKey); + config.getOptional(MilvusOptions.EMBEDDINGS_FIELDS) + .ifPresent(sinkConfig::setEmbeddingsFields); + + return sinkConfig; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java new file mode 100644 index 00000000000..96c163aaeb5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum MilvusConnectorErrorCode implements SeaTunnelErrorCode { + RESPONSE_FAILED("Milvus-01", "The server returns an exception."); + + private final String code; + private final String description; + + MilvusConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java new file mode 100644 index 00000000000..88ee8d105a4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class MilvusConnectorException extends SeaTunnelRuntimeException { + public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public MilvusConnectorException( + SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java new file mode 100644 index 00000000000..6f266e46ed6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.ConfigValidator; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; + +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSink.class) +public class MilvusSink extends AbstractSimpleSink { + + private SeaTunnelRowType seaTunnelRowType; + + private MilvusSinkConfig milvusSinkConfig; + + @Override + public String getPluginName() { + return "Milvus"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + milvusSinkConfig = MilvusSinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig)); + ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig)) + .validate(new MilvusSinkFactory().optionRule()); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) { + return new MilvusSinkWriter(milvusSinkConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java new file mode 100644 index 00000000000..a256425d756 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class MilvusSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Milvus"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + MilvusOptions.MILVUS_HOST, + MilvusOptions.MILVUS_PORT, + MilvusOptions.COLLECTION_NAME, + MilvusOptions.USERNAME, + MilvusOptions.PASSWORD) + .optional( + MilvusOptions.PARTITION_FIELD, + MilvusOptions.OPENAI_ENGINE, + MilvusOptions.OPENAI_API_KEY, + MilvusOptions.EMBEDDINGS_FIELDS) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java new file mode 100644 index 00000000000..5fe5ff9a504 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; + +import com.theokanning.openai.embedding.EmbeddingRequest; +import com.theokanning.openai.embedding.EmbeddingResult; +import com.theokanning.openai.service.OpenAiService; +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.DataType; +import io.milvus.grpc.DescribeCollectionResponse; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.collection.DescribeCollectionParam; +import io.milvus.param.collection.FieldType; +import io.milvus.param.collection.FlushParam; +import io.milvus.param.collection.HasCollectionParam; +import io.milvus.param.dml.InsertParam; +import io.milvus.response.DescCollResponseWrapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; +import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorErrorCode.RESPONSE_FAILED; + +public class MilvusSinkWriter extends AbstractSinkWriter { + + private final MilvusServiceClient milvusClient; + + private final MilvusSinkConfig milvusSinkConfig; + + private OpenAiService service; + + private final List metaFields; + + public MilvusSinkWriter(MilvusSinkConfig milvusSinkConfig) { + this.milvusSinkConfig = milvusSinkConfig; + ConnectParam connectParam = + ConnectParam.newBuilder() + .withHost(milvusSinkConfig.getMilvusHost()) + .withPort(milvusSinkConfig.getMilvusPort()) + .withAuthorization( + milvusSinkConfig.getUserName(), milvusSinkConfig.getPassword()) + .build(); + milvusClient = new MilvusServiceClient(connectParam); + + handleResponseStatus( + milvusClient.hasCollection( + HasCollectionParam.newBuilder() + .withCollectionName(milvusSinkConfig.getCollectionName()) + .build())); + + R describeCollectionResponseR = + milvusClient.describeCollection( + DescribeCollectionParam.newBuilder() + .withCollectionName(milvusSinkConfig.getCollectionName()) + .build()); + + handleResponseStatus(describeCollectionResponseR); + + DescCollResponseWrapper wrapper = + new DescCollResponseWrapper(describeCollectionResponseR.getData()); + + this.metaFields = wrapper.getFields(); + + if (milvusSinkConfig.getEmbeddingsFields() != null) { + service = new OpenAiService(milvusSinkConfig.getOpenaiApiKey()); + } + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + + List fields = new ArrayList<>(); + + InsertParam.Builder builder = InsertParam.newBuilder(); + + builder = builder.withCollectionName(milvusSinkConfig.getCollectionName()); + + for (int i = 0; i < this.metaFields.size(); i++) { + + FieldType fieldType = this.metaFields.get(i); + + Object field = element.getField(i); + if (fieldType.isPrimaryKey()) { + if (!(field instanceof Long) + && !(field instanceof Integer) + && !(field instanceof Byte) + && !(field instanceof Short) + && !(field instanceof String)) { + throw new MilvusConnectorException( + ILLEGAL_ARGUMENT, "Primary key field only supports number and string."); + } + } + + if (milvusSinkConfig.getPartitionField() != null + && milvusSinkConfig.getPartitionField().equals(fieldType.getName())) { + builder.withPartitionName(String.valueOf(field)); + } + if (milvusSinkConfig.getEmbeddingsFields() != null) { + List embeddingsFields = + Arrays.asList(milvusSinkConfig.getEmbeddingsFields().split(",")); + if (embeddingsFields.contains(fieldType.getName())) { + if (fieldType.getDataType() != DataType.BinaryVector + && fieldType.getDataType() != DataType.FloatVector) { + throw new MilvusConnectorException( + ILLEGAL_ARGUMENT, "Vector field only supports binary and float."); + } + EmbeddingResult embeddings = + service.createEmbeddings( + EmbeddingRequest.builder() + .model(milvusSinkConfig.getOpenaiEngine()) + .input(Collections.singletonList(String.valueOf(field))) + .build()); + List embedding = embeddings.getData().get(0).getEmbedding(); + List collect = + embedding.stream().map(Double::floatValue).collect(Collectors.toList()); + InsertParam.Field insertField = + new InsertParam.Field( + fieldType.getName(), Collections.singletonList(collect)); + fields.add(insertField); + continue; + } + } + + judgmentParameterType(fieldType, field); + + InsertParam.Field insertField = + new InsertParam.Field(fieldType.getName(), Collections.singletonList(field)); + fields.add(insertField); + } + + InsertParam build = builder.withFields(fields).build(); + + handleResponseStatus(milvusClient.insert(build)); + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + milvusClient.flush( + FlushParam.newBuilder() + .addCollectionName(milvusSinkConfig.getCollectionName()) + .build()); + return Collections.emptyList(); + } + + @Override + public void close() throws IOException { + milvusClient.close(); + service.shutdownExecutor(); + } + + private void handleResponseStatus(R r) { + if (r.getStatus() != R.Status.Success.getCode()) { + throw new MilvusConnectorException(RESPONSE_FAILED, r.getMessage(), r.getException()); + } + } + + private void judgmentParameterType(FieldType fieldType, Object value) { + switch (fieldType.getDataType()) { + case Bool: + if (!(value instanceof Boolean)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + break; + case Int8: + case Int16: + case Int32: + if (!(value instanceof Integer) + && !(value instanceof Byte) + && !(value instanceof Short)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + break; + case Int64: + if (!(value instanceof Long) + && !(value instanceof Integer) + && !(value instanceof Byte) + && !(value instanceof Short)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + break; + case Float: + if (!(value instanceof Float)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + break; + case Double: + if (!(value instanceof Float) && !(value instanceof Double)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + break; + case VarChar: + if (!(value instanceof String)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + break; + default: + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 8af51a0d0e5..6f70ecd3323 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -73,6 +73,7 @@ connector-hbase connector-rocketmq connector-paimon + connector-milvus diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 80ef7209da3..bd8dfd2a4e2 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -509,6 +509,13 @@ provided + + org.apache.seatunnel + connector-milvus + ${project.version} + provided + + com.aliyun.phoenix diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml new file mode 100644 index 00000000000..a88f2378438 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml @@ -0,0 +1,55 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + + connector-milvus-v2-e2e + + SeaTunnel : E2E : Connector V2 : Milvus + + + 2.2.2 + + + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-milvus + ${project.version} + test + + + io.milvus + milvus-sdk-java + ${milvus.version} + test + + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java new file mode 100644 index 00000000000..6c4e6580785 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.milvus; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.QueryResults; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.dml.QueryParam; + +import java.io.IOException; + +@Disabled +public class MilvusIT extends TestSuiteBase implements TestResource { + + private MilvusServiceClient milvusClient; + + private final String host = "172.30.10.80"; + + private final Integer port = 19530; + + private final String username = "root"; + + private final String password = "Milvus"; + + private final String collectionName = "title_db"; + + @BeforeEach + @Override + public void startUp() throws Exception { + ConnectParam connectParam = + ConnectParam.newBuilder() + .withHost(host) + .withPort(port) + .withAuthorization(username, password) + .build(); + milvusClient = new MilvusServiceClient(connectParam); + } + + @AfterEach + @Override + public void tearDown() throws Exception { + if (milvusClient != null) { + milvusClient.close(); + } + } + + @TestTemplate + public void testMilvus(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/fake_to_milvus.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + R query = + milvusClient.query( + QueryParam.newBuilder().withCollectionName(collectionName).build()); + + Assertions.assertEquals(5, query.getData().getFieldsDataCount()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf new file mode 100644 index 00000000000..223070fe744 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of batch processing in SeaTunnel config +###### + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + + FakeSource { + schema = { + fields { + c_int = int + c_array = array + c_tinyint = tinyint + c_smallint = smallint + c_bigint = bigint + c_float = float + c_double = double + c_bol = "boolean" + } + } + result_table_name = "fake" + } + +} + +transform { +} + +sink { + Milvus { + milvus_host = 172.30.10.80 + milvus_port = 19530 + username = root + password = Milvus + collection_name = title_db + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 070ab6e936f..aa7c200ba4f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -58,6 +58,7 @@ connector-rocketmq-e2e connector-pulsar-e2e connector-paimon-e2e + connector-milvus-v2-e2e