diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md
new file mode 100644
index 00000000000..85ead00f647
--- /dev/null
+++ b/docs/en/connector-v2/sink/Milvus.md
@@ -0,0 +1,94 @@
+> Milvus sink connector
+
+## Support These Engines
+
+> Spark
+> Flink
+> SeaTunnel Zeta
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## Description
+
+Write data to Apache milvus.
+
+## Sink Options
+
+| Name | Type | Required | Default | Description |
+|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------|
+| milvus_host | String | Yes | - | The milvus host. |
+| milvus_port | Int | No | 19530 | This port is for gRPC. Default is 19530. |
+| username | String | Yes | - | The username of milvus server. |
+| password | String | Yes | - | The password of milvus server. |
+| collection_name | String | No | - | A collection of milvus, which is similar to a table in a relational database. |
+| partition_field | String | No | - | Partition fields, which must be included in the collection's schema. |
+| openai_engine | String | No | text-embedding-ada-002 | Text embedding model. Default is 'text-embedding-ada-002'. |
+| openai_api_key | String | No | - | Use your own Open AI API Key here. |
+| embeddings_fields | String | No | - | Fields to be embedded,They use`,`for splitting. |
+
+### Data Type Mapping
+
+| Milvus Data type | SeaTunnel Data type |
+|------------------|---------------------|
+| Bool | BOOLEAN |
+| Int8 | TINYINT |
+| Int16 | SMALLINT |
+| Int32 | INT |
+| Int64 | BIGINT |
+| Float | FLOAT |
+| Double | DOUBLE |
+| VarChar | DECIMAL |
+| String | STRING |
+
+## Examples
+
+```hocon
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ LocalFile {
+ schema {
+ fields {
+ bookID = string
+ title_1 = string
+ title_2 = string
+ }
+ }
+ path = "/tmp/milvus_test/book"
+ file_format_type = "csv"
+ }
+}
+
+transform {
+}
+
+sink {
+ Milvus {
+ milvus_host = localhost
+ milvus_port = 19530
+ username = root
+ password = Milvus
+ collection_name = title_db
+ openai_engine = text-embedding-ada-002
+ openai_api_key = sk-xxxx
+ embeddings_fields = title_2
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Milvus Sink Connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index de6593b4523..d2a743a1021 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -110,3 +110,4 @@ seatunnel.source.Rocketmq = connector-rocketmq
seatunnel.sink.Rocketmq = connector-rocketmq
seatunnel.source.Paimon = connector-paimon
seatunnel.sink.Paimon = connector-paimon
+seatunnel.sink.Milvus = connector-milvus
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-milvus/pom.xml b/seatunnel-connectors-v2/connector-milvus/pom.xml
new file mode 100644
index 00000000000..42e84e3687d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/pom.xml
@@ -0,0 +1,59 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connectors-v2
+ ${revision}
+
+
+ connector-milvus
+ SeaTunnel : Connectors V2 : Milvus
+
+
+ 2.2.2
+ 0.12.0
+
+
+
+
+
+ io.milvus
+ milvus-sdk-java
+ ${milvus.version}
+
+
+
+ com.theokanning.openai-gpt3-java
+ service
+ ${openai.sdk.version}
+
+
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
+
+
+
+
diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java
new file mode 100644
index 00000000000..f64b3fd731b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+
+public class MilvusOptions implements Serializable {
+
+ public static final Option MILVUS_HOST =
+ Options.key("milvus_host")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The milvus host");
+
+ public static final Option MILVUS_PORT =
+ Options.key("milvus_port")
+ .intType()
+ .defaultValue(19530)
+ .withDescription("This port is for gRPC. Default is 19530");
+
+ public static final Option USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The username of milvus server.");
+
+ public static final Option PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The password of milvus server.");
+
+ public static final Option COLLECTION_NAME =
+ Options.key("collection_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "A collection of milvus, which is similar to a table in a relational database.");
+
+ public static final Option PARTITION_FIELD =
+ Options.key("partition_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Partition fields, which must be included in the collection's schema.");
+
+ public static final Option OPENAI_ENGINE =
+ Options.key("openai_engine")
+ .stringType()
+ .defaultValue("text-embedding-ada-002")
+ .withDescription("Text embedding model. Default is 'text-embedding-ada-002'");
+
+ public static final Option OPENAI_API_KEY =
+ Options.key("openai_api_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Use your own Open AI API Key here.");
+
+ public static final Option EMBEDDINGS_FIELDS =
+ Options.key("embeddings_fields")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Fields to be embedded,They use`,`for splitting");
+}
diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
new file mode 100644
index 00000000000..56aff7d6550
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.milvus.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+@Setter
+@Getter
+@ToString
+public class MilvusSinkConfig implements Serializable {
+
+ private String milvusHost;
+ private Integer milvusPort;
+ private String collectionName;
+ private String partitionField;
+ private String userName;
+ private String password;
+ private String openaiEngine;
+ private String openaiApiKey;
+ private String embeddingsFields;
+
+ public static MilvusSinkConfig of(ReadonlyConfig config) {
+ MilvusSinkConfig sinkConfig = new MilvusSinkConfig();
+ sinkConfig.setMilvusHost(config.get(MilvusOptions.MILVUS_HOST));
+ sinkConfig.setMilvusPort(config.get(MilvusOptions.MILVUS_PORT));
+ sinkConfig.setCollectionName(config.get(MilvusOptions.COLLECTION_NAME));
+ config.getOptional(MilvusOptions.PARTITION_FIELD).ifPresent(sinkConfig::setPartitionField);
+ config.getOptional(MilvusOptions.USERNAME).ifPresent(sinkConfig::setUserName);
+ config.getOptional(MilvusOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
+ config.getOptional(MilvusOptions.OPENAI_ENGINE).ifPresent(sinkConfig::setOpenaiEngine);
+ config.getOptional(MilvusOptions.OPENAI_API_KEY).ifPresent(sinkConfig::setOpenaiApiKey);
+ config.getOptional(MilvusOptions.EMBEDDINGS_FIELDS)
+ .ifPresent(sinkConfig::setEmbeddingsFields);
+
+ return sinkConfig;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java
new file mode 100644
index 00000000000..96c163aaeb5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum MilvusConnectorErrorCode implements SeaTunnelErrorCode {
+ RESPONSE_FAILED("Milvus-01", "The server returns an exception.");
+
+ private final String code;
+ private final String description;
+
+ MilvusConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
new file mode 100644
index 00000000000..88ee8d105a4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class MilvusConnectorException extends SeaTunnelRuntimeException {
+ public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public MilvusConnectorException(
+ SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
new file mode 100644
index 00000000000..6f266e46ed6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class MilvusSink extends AbstractSimpleSink {
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ private MilvusSinkConfig milvusSinkConfig;
+
+ @Override
+ public String getPluginName() {
+ return "Milvus";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ milvusSinkConfig = MilvusSinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new MilvusSinkFactory().optionRule());
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType getConsumedType() {
+ return seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter createWriter(SinkWriter.Context context) {
+ return new MilvusSinkWriter(milvusSinkConfig);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
new file mode 100644
index 00000000000..a256425d756
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MilvusSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Milvus";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ MilvusOptions.MILVUS_HOST,
+ MilvusOptions.MILVUS_PORT,
+ MilvusOptions.COLLECTION_NAME,
+ MilvusOptions.USERNAME,
+ MilvusOptions.PASSWORD)
+ .optional(
+ MilvusOptions.PARTITION_FIELD,
+ MilvusOptions.OPENAI_ENGINE,
+ MilvusOptions.OPENAI_API_KEY,
+ MilvusOptions.EMBEDDINGS_FIELDS)
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
new file mode 100644
index 00000000000..5fe5ff9a504
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import com.theokanning.openai.embedding.EmbeddingRequest;
+import com.theokanning.openai.embedding.EmbeddingResult;
+import com.theokanning.openai.service.OpenAiService;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.DescribeCollectionResponse;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.R;
+import io.milvus.param.collection.DescribeCollectionParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.FlushParam;
+import io.milvus.param.collection.HasCollectionParam;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.response.DescCollResponseWrapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
+import static org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorErrorCode.RESPONSE_FAILED;
+
+public class MilvusSinkWriter extends AbstractSinkWriter {
+
+ private final MilvusServiceClient milvusClient;
+
+ private final MilvusSinkConfig milvusSinkConfig;
+
+ private OpenAiService service;
+
+ private final List metaFields;
+
+ public MilvusSinkWriter(MilvusSinkConfig milvusSinkConfig) {
+ this.milvusSinkConfig = milvusSinkConfig;
+ ConnectParam connectParam =
+ ConnectParam.newBuilder()
+ .withHost(milvusSinkConfig.getMilvusHost())
+ .withPort(milvusSinkConfig.getMilvusPort())
+ .withAuthorization(
+ milvusSinkConfig.getUserName(), milvusSinkConfig.getPassword())
+ .build();
+ milvusClient = new MilvusServiceClient(connectParam);
+
+ handleResponseStatus(
+ milvusClient.hasCollection(
+ HasCollectionParam.newBuilder()
+ .withCollectionName(milvusSinkConfig.getCollectionName())
+ .build()));
+
+ R describeCollectionResponseR =
+ milvusClient.describeCollection(
+ DescribeCollectionParam.newBuilder()
+ .withCollectionName(milvusSinkConfig.getCollectionName())
+ .build());
+
+ handleResponseStatus(describeCollectionResponseR);
+
+ DescCollResponseWrapper wrapper =
+ new DescCollResponseWrapper(describeCollectionResponseR.getData());
+
+ this.metaFields = wrapper.getFields();
+
+ if (milvusSinkConfig.getEmbeddingsFields() != null) {
+ service = new OpenAiService(milvusSinkConfig.getOpenaiApiKey());
+ }
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+
+ List fields = new ArrayList<>();
+
+ InsertParam.Builder builder = InsertParam.newBuilder();
+
+ builder = builder.withCollectionName(milvusSinkConfig.getCollectionName());
+
+ for (int i = 0; i < this.metaFields.size(); i++) {
+
+ FieldType fieldType = this.metaFields.get(i);
+
+ Object field = element.getField(i);
+ if (fieldType.isPrimaryKey()) {
+ if (!(field instanceof Long)
+ && !(field instanceof Integer)
+ && !(field instanceof Byte)
+ && !(field instanceof Short)
+ && !(field instanceof String)) {
+ throw new MilvusConnectorException(
+ ILLEGAL_ARGUMENT, "Primary key field only supports number and string.");
+ }
+ }
+
+ if (milvusSinkConfig.getPartitionField() != null
+ && milvusSinkConfig.getPartitionField().equals(fieldType.getName())) {
+ builder.withPartitionName(String.valueOf(field));
+ }
+ if (milvusSinkConfig.getEmbeddingsFields() != null) {
+ List embeddingsFields =
+ Arrays.asList(milvusSinkConfig.getEmbeddingsFields().split(","));
+ if (embeddingsFields.contains(fieldType.getName())) {
+ if (fieldType.getDataType() != DataType.BinaryVector
+ && fieldType.getDataType() != DataType.FloatVector) {
+ throw new MilvusConnectorException(
+ ILLEGAL_ARGUMENT, "Vector field only supports binary and float.");
+ }
+ EmbeddingResult embeddings =
+ service.createEmbeddings(
+ EmbeddingRequest.builder()
+ .model(milvusSinkConfig.getOpenaiEngine())
+ .input(Collections.singletonList(String.valueOf(field)))
+ .build());
+ List embedding = embeddings.getData().get(0).getEmbedding();
+ List collect =
+ embedding.stream().map(Double::floatValue).collect(Collectors.toList());
+ InsertParam.Field insertField =
+ new InsertParam.Field(
+ fieldType.getName(), Collections.singletonList(collect));
+ fields.add(insertField);
+ continue;
+ }
+ }
+
+ judgmentParameterType(fieldType, field);
+
+ InsertParam.Field insertField =
+ new InsertParam.Field(fieldType.getName(), Collections.singletonList(field));
+ fields.add(insertField);
+ }
+
+ InsertParam build = builder.withFields(fields).build();
+
+ handleResponseStatus(milvusClient.insert(build));
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) throws IOException {
+ milvusClient.flush(
+ FlushParam.newBuilder()
+ .addCollectionName(milvusSinkConfig.getCollectionName())
+ .build());
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void close() throws IOException {
+ milvusClient.close();
+ service.shutdownExecutor();
+ }
+
+ private void handleResponseStatus(R> r) {
+ if (r.getStatus() != R.Status.Success.getCode()) {
+ throw new MilvusConnectorException(RESPONSE_FAILED, r.getMessage(), r.getException());
+ }
+ }
+
+ private void judgmentParameterType(FieldType fieldType, Object value) {
+ switch (fieldType.getDataType()) {
+ case Bool:
+ if (!(value instanceof Boolean)) {
+ throw new MilvusConnectorException(
+ UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription());
+ }
+ break;
+ case Int8:
+ case Int16:
+ case Int32:
+ if (!(value instanceof Integer)
+ && !(value instanceof Byte)
+ && !(value instanceof Short)) {
+ throw new MilvusConnectorException(
+ UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription());
+ }
+ break;
+ case Int64:
+ if (!(value instanceof Long)
+ && !(value instanceof Integer)
+ && !(value instanceof Byte)
+ && !(value instanceof Short)) {
+ throw new MilvusConnectorException(
+ UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription());
+ }
+ break;
+ case Float:
+ if (!(value instanceof Float)) {
+ throw new MilvusConnectorException(
+ UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription());
+ }
+ break;
+ case Double:
+ if (!(value instanceof Float) && !(value instanceof Double)) {
+ throw new MilvusConnectorException(
+ UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription());
+ }
+ break;
+ case VarChar:
+ if (!(value instanceof String)) {
+ throw new MilvusConnectorException(
+ UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription());
+ }
+ break;
+ default:
+ throw new MilvusConnectorException(
+ UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription());
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 8af51a0d0e5..6f70ecd3323 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -73,6 +73,7 @@
connector-hbase
connector-rocketmq
connector-paimon
+ connector-milvus
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 80ef7209da3..bd8dfd2a4e2 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -509,6 +509,13 @@
provided
+
+ org.apache.seatunnel
+ connector-milvus
+ ${project.version}
+ provided
+
+
com.aliyun.phoenix
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml
new file mode 100644
index 00000000000..a88f2378438
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connector-v2-e2e
+ ${revision}
+
+
+ connector-milvus-v2-e2e
+
+ SeaTunnel : E2E : Connector V2 : Milvus
+
+
+ 2.2.2
+
+
+
+
+
+ org.apache.seatunnel
+ connector-fake
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-milvus
+ ${project.version}
+ test
+
+
+ io.milvus
+ milvus-sdk-java
+ ${milvus.version}
+ test
+
+
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java
new file mode 100644
index 00000000000..6c4e6580785
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.milvus;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.QueryResults;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.R;
+import io.milvus.param.dml.QueryParam;
+
+import java.io.IOException;
+
+@Disabled
+public class MilvusIT extends TestSuiteBase implements TestResource {
+
+ private MilvusServiceClient milvusClient;
+
+ private final String host = "172.30.10.80";
+
+ private final Integer port = 19530;
+
+ private final String username = "root";
+
+ private final String password = "Milvus";
+
+ private final String collectionName = "title_db";
+
+ @BeforeEach
+ @Override
+ public void startUp() throws Exception {
+ ConnectParam connectParam =
+ ConnectParam.newBuilder()
+ .withHost(host)
+ .withPort(port)
+ .withAuthorization(username, password)
+ .build();
+ milvusClient = new MilvusServiceClient(connectParam);
+ }
+
+ @AfterEach
+ @Override
+ public void tearDown() throws Exception {
+ if (milvusClient != null) {
+ milvusClient.close();
+ }
+ }
+
+ @TestTemplate
+ public void testMilvus(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/fake_to_milvus.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ R query =
+ milvusClient.query(
+ QueryParam.newBuilder().withCollectionName(collectionName).build());
+
+ Assertions.assertEquals(5, query.getData().getFieldsDataCount());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf
new file mode 100644
index 00000000000..223070fe744
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of batch processing in SeaTunnel config
+######
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+
+ FakeSource {
+ schema = {
+ fields {
+ c_int = int
+ c_array = array
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bol = "boolean"
+ }
+ }
+ result_table_name = "fake"
+ }
+
+}
+
+transform {
+}
+
+sink {
+ Milvus {
+ milvus_host = 172.30.10.80
+ milvus_port = 19530
+ username = root
+ password = Milvus
+ collection_name = title_db
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 070ab6e936f..aa7c200ba4f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -58,6 +58,7 @@
connector-rocketmq-e2e
connector-pulsar-e2e
connector-paimon-e2e
+ connector-milvus-v2-e2e